aboutsummaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorVolodymyr Vysotskyi <vvovyk@gmail.com>2019-01-10 20:47:08 +0200
committerVitalii Diravka <vitalii.diravka@gmail.com>2019-01-18 17:52:02 +0200
commit95d91f40c5991720b6f9d1cd2147edf58c8b136f (patch)
treef2833333bf340d1ae301c06ccb93428219af51cc /contrib
parentda7cb4e2fc67393f0ef26b0a03954ad139fdf7f9 (diff)
DRILL-6969: Fix inconsistency of reading MaprDB JSON tables using hive plugin when native reader is enabled
closes #1610
Diffstat (limited to 'contrib')
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java8
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java57
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java7
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java75
-rw-r--r--contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java4
-rw-r--r--contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java11
6 files changed, 133 insertions, 29 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
index 07943f6f7..c17696c0f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -32,6 +32,7 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
public boolean ignoreSchemaChange = false;
public boolean readAllNumbersAsDouble = false;
public boolean disableCountOptimization = false;
+ public boolean readTimestampWithZoneOffset = false;
/* This flag is a switch to do special handling in case of
* no columns in the query exists in the maprdb table. This flag
* can get deprecated once it is observed that this special handling
@@ -48,6 +49,7 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
result = 31 * result + (readAllNumbersAsDouble ? 1231 : 1237);
result = 31 * result + (disableCountOptimization ? 1231 : 1237);
result = 31 * result + (nonExistentFieldSupport ? 1231 : 1237);
+ result = 31 * result + (readTimestampWithZoneOffset ? 1231 : 1237);
return result;
}
@@ -68,6 +70,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
return false;
} else if (!index.equals(other.index)) {
return false;
+ } else if (readTimestampWithZoneOffset != other.readTimestampWithZoneOffset) {
+ return false;
}
return true;
}
@@ -76,6 +80,10 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
return readAllNumbersAsDouble;
}
+ public boolean isReadTimestampWithZoneOffset() {
+ return readTimestampWithZoneOffset;
+ }
+
public boolean isAllTextMode() {
return allTextMode;
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
index a4cb0bd07..d9db7bccd 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.joda.time.LocalTime;
import org.ojai.Value;
import org.ojai.types.ODate;
@@ -42,7 +43,6 @@ import org.ojai.types.OTime;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import com.mapr.db.rowcol.KeyValueBuilder;
import com.mapr.db.util.SqlHelper;
import org.ojai.types.OTimestamp;
@@ -51,8 +51,8 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
private String functionName;
private Boolean success;
- private Value value;
- private SchemaPath path;
+ protected Value value;
+ protected SchemaPath path;
public CompareFunctionsProcessor(String functionName) {
this.functionName = functionName;
@@ -69,13 +69,45 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
return false;
}
+ /**
+ * Converts specified function call to be pushed into maprDB JSON scan.
+ *
+ * @param call function call to be pushed
+ * @return CompareFunctionsProcessor instance which contains converted function call
+ */
public static CompareFunctionsProcessor process(FunctionCall call) {
+ return processWithEvaluator(call, new CompareFunctionsProcessor(call.getName()));
+ }
+
+ /**
+ * Converts specified function call to be pushed into maprDB JSON scan.
+ * For the case when timestamp value is used, it is converted to UTC timezone
+ * before converting to {@link OTimestamp} instance.
+ *
+ * @param call function call to be pushed
+ * @return CompareFunctionsProcessor instance which contains converted function call
+ */
+ public static CompareFunctionsProcessor processWithTimeZoneOffset(FunctionCall call) {
+ CompareFunctionsProcessor processor = new CompareFunctionsProcessor(call.getName()) {
+ @Override
+ protected boolean visitTimestampExpr(SchemaPath path, TimeStampExpression valueArg) {
+ // converts timestamp value from local time zone to UTC since the record reader
+ // reads the timestamp in local timezone if the readTimestampWithZoneOffset flag is enabled
+ long timeStamp = valueArg.getTimeStamp() - DateUtility.TIMEZONE_OFFSET_MILLIS;
+ this.value = KeyValueBuilder.initFrom(new OTimestamp(timeStamp));
+ this.path = path;
+ return true;
+ }
+ };
+ return processWithEvaluator(call, processor);
+ }
+
+ private static CompareFunctionsProcessor processWithEvaluator(FunctionCall call, CompareFunctionsProcessor evaluator) {
String functionName = call.getName();
LogicalExpression nameArg = call.args.get(0);
- LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : null;
- CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+ LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
- //if (valueArg != null) {
+ if (valueArg != null) {
if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
LogicalExpression swapArg = valueArg;
valueArg = nameArg;
@@ -83,7 +115,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
}
evaluator.success = nameArg.accept(evaluator, valueArg);
- //}
+ }
return evaluator;
}
@@ -187,14 +219,17 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr
}
if (valueArg instanceof TimeStampExpression) {
- this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp()));
- this.path = path;
- return true;
+ return visitTimestampExpr(path, (TimeStampExpression) valueArg);
}
-
return false;
}
+ protected boolean visitTimestampExpr(SchemaPath path, TimeStampExpression valueArg) {
+ this.value = KeyValueBuilder.initFrom(new OTimestamp(valueArg.getTimeStamp()));
+ this.path = path;
+ return true;
+ }
+
private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
static {
ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
index 92d40f789..252bc538e 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java
@@ -74,7 +74,12 @@ public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void
ImmutableList<LogicalExpression> args = call.args;
if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
- CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call);
+ CompareFunctionsProcessor processor;
+ if (groupScan.getFormatPlugin().getConfig().isReadTimestampWithZoneOffset()) {
+ processor = CompareFunctionsProcessor.processWithTimeZoneOffset(call);
+ } else {
+ processor = CompareFunctionsProcessor.process(call);
+ }
if (processor.isSuccess()) {
nodeScanSpec = createJsonScanSpec(call, processor);
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index f13d64d9e..081d8fde1 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -43,6 +44,7 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.util.EncodedSchemaPathSet;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.hadoop.fs.Path;
import org.ojai.Document;
@@ -60,8 +62,9 @@ import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
+import java.time.Instant;
+import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -112,6 +115,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private final boolean unionEnabled;
private final boolean readNumbersAsDouble;
+ private final boolean readTimestampWithZoneOffset;
private boolean disablePushdown;
private final boolean allTextMode;
private final boolean ignoreSchemaChange;
@@ -156,6 +160,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
setColumns(projectedColumns);
unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
readNumbersAsDouble = formatPlugin.getConfig().isReadAllNumbersAsDouble();
+ readTimestampWithZoneOffset = formatPlugin.getConfig().isReadTimestampWithZoneOffset();
allTextMode = formatPlugin.getConfig().isAllTextMode();
ignoreSchemaChange = formatPlugin.getConfig().isIgnoreSchemaChange();
disablePushdown = !formatPlugin.getConfig().isEnablePushdown();
@@ -284,16 +289,53 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
throw new ExecutionSetupException(ex);
}
}
- /*
- * Setup the valueWriter and documentWriters based on config options
+
+ /**
+ * Setup the valueWriter and documentWriters based on config options.
*/
private void setupWriter() {
if (allTextMode) {
- valueWriter = new AllTextValueWriter(buffer);
+ if (readTimestampWithZoneOffset) {
+ valueWriter = new AllTextValueWriter(buffer) {
+ /**
+ * Applies local time zone offset to timestamp value read using specified {@code reader}.
+ *
+ * @param writer writer to store string representation of timestamp value
+ * @param fieldName name of the field
+ * @param reader document reader
+ */
+ @Override
+ protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ String formattedTimestamp = Instant.ofEpochMilli(reader.getTimestampLong())
+ .atOffset(OffsetDateTime.now().getOffset()).format(DateUtility.UTC_FORMATTER);
+ writeString(writer, fieldName, formattedTimestamp);
+ }
+ };
+ } else {
+ valueWriter = new AllTextValueWriter(buffer);
+ }
} else if (readNumbersAsDouble) {
- valueWriter = new NumbersAsDoubleValueWriter(buffer);
+ if (readTimestampWithZoneOffset) {
+ valueWriter = new NumbersAsDoubleValueWriter(buffer) {
+ @Override
+ protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeTimestampWithLocalZoneOffset(writer, fieldName, reader);
+ }
+ };
+ } else {
+ valueWriter = new NumbersAsDoubleValueWriter(buffer);
+ }
} else {
- valueWriter = new OjaiValueWriter(buffer);
+ if (readTimestampWithZoneOffset) {
+ valueWriter = new OjaiValueWriter(buffer) {
+ @Override
+ protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ writeTimestampWithLocalZoneOffset(writer, fieldName, reader);
+ }
+ };
+ } else {
+ valueWriter = new OjaiValueWriter(buffer);
+ }
}
if (projectWholeDocument) {
@@ -307,6 +349,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
}
+ /**
+ * Applies local time zone offset to timestamp value read using specified {@code reader}.
+ *
+ * @param writer writer to store timestamp value
+ * @param fieldName name of the field
+ * @param reader document reader
+ */
+ private void writeTimestampWithLocalZoneOffset(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) {
+ long timestamp = reader.getTimestampLong() + DateUtility.TIMEZONE_OFFSET_MILLIS;
+ writer.timeStamp(fieldName).writeTimeStamp(timestamp);
+ }
+
@Override
public int next() {
Stopwatch watch = Stopwatch.createUnstarted();
@@ -387,7 +441,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
if (nonExistentColumnsProjection && recordCount > 0) {
- JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.EMPTY_LIST);
+ JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.emptyList());
}
vectorWriter.setValueCount(recordCount);
if (maxRecordsToRead > 0) {
@@ -463,12 +517,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
public static boolean includesIdField(Collection<FieldPath> projected) {
- return Iterables.tryFind(projected, new Predicate<FieldPath>() {
- @Override
- public boolean apply(FieldPath path) {
- return Preconditions.checkNotNull(path).equals(ID_FIELD);
- }
- }).isPresent();
+ return Iterables.tryFind(projected, path -> Preconditions.checkNotNull(path).equals(ID_FIELD)).isPresent();
}
@Override
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java
index e521c64d0..efb70aa1b 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java
@@ -82,8 +82,8 @@ public class TestScanRanges extends BaseJsonTest {
table.flush();
DBTests.waitForRowCount(table.getPath(), TOTAL_ROW_COUNT);
- setSessionOption("planner.width.max_per_node", "5");
- }
+ setSessionOption("planner.width.max_per_node", 5);
+ }
}
@AfterClass
diff --git a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index b8c267589..7aca59d58 100644
--- a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -23,8 +23,11 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
@@ -75,6 +78,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
public void onMatch(RelOptRuleCall call) {
try {
DrillScanRel hiveScanRel = call.rel(0);
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
@@ -90,7 +94,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
"partitions");
}
- DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel);
+ DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel, settings);
call.transformTo(nativeScanRel);
/*
@@ -110,7 +114,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
/**
* Helper method which creates a DrillScanRel with native Drill HiveScan.
*/
- private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) {
+ private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel, PlannerSettings settings) {
RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters();
@@ -129,6 +133,9 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
hiveScanCols
);
+ nativeMapRDBScan.getFormatPlugin().getConfig().readTimestampWithZoneOffset =
+ settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET);
+
List<String> nativeScanColNames = hiveScanRel.getRowType().getFieldList().stream()
.map(field -> replaceOverriddenColumnId(parameters, field.getName()))
.collect(Collectors.toList());