diff options
author | Volodymyr Vysotskyi <vvovyk@gmail.com> | 2019-01-10 20:47:08 +0200 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2019-01-18 17:52:02 +0200 |
commit | 95d91f40c5991720b6f9d1cd2147edf58c8b136f (patch) | |
tree | f2833333bf340d1ae301c06ccb93428219af51cc /contrib | |
parent | da7cb4e2fc67393f0ef26b0a03954ad139fdf7f9 (diff) |
DRILL-6969: Fix inconsistency of reading MaprDB JSON tables using hive plugin when native reader is enabled
closes #1610
Diffstat (limited to 'contrib')
6 files changed, 133 insertions, 29 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java index 07943f6f7..c17696c0f 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java @@ -32,6 +32,7 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { public boolean ignoreSchemaChange = false; public boolean readAllNumbersAsDouble = false; public boolean disableCountOptimization = false; + public boolean readTimestampWithZoneOffset = false; /* This flag is a switch to do special handling in case of * no columns in the query exists in the maprdb table. This flag * can get deprecated once it is observed that this special handling @@ -48,6 +49,7 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { result = 31 * result + (readAllNumbersAsDouble ? 1231 : 1237); result = 31 * result + (disableCountOptimization ? 1231 : 1237); result = 31 * result + (nonExistentFieldSupport ? 1231 : 1237); + result = 31 * result + (readTimestampWithZoneOffset ? 1231 : 1237); return result; } @@ -68,6 +70,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { return false; } else if (!index.equals(other.index)) { return false; + } else if (readTimestampWithZoneOffset != other.readTimestampWithZoneOffset) { + return false; } return true; } @@ -76,6 +80,10 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { return readAllNumbersAsDouble; } + public boolean isReadTimestampWithZoneOffset() { + return readTimestampWithZoneOffset; + } + public boolean isAllTextMode() { return allTextMode; } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java index a4cb0bd07..d9db7bccd 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/CompareFunctionsProcessor.java @@ -35,6 +35,7 @@ import org.apache.drill.common.expression.ValueExpressions.TimeExpression; import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.joda.time.LocalTime; import org.ojai.Value; import org.ojai.types.ODate; @@ -42,7 +43,6 @@ import org.ojai.types.OTime; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; -import com.mapr.db.rowcol.KeyValueBuilder; import com.mapr.db.util.SqlHelper; import org.ojai.types.OTimestamp; @@ -51,8 +51,8 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr private String functionName; private Boolean success; - private Value value; - private SchemaPath path; + protected Value value; + protected SchemaPath path; public CompareFunctionsProcessor(String functionName) { this.functionName = functionName; @@ -69,13 +69,45 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr return false; } + /** + * Converts specified function call to be pushed into maprDB JSON scan. + * + * @param call function call to be pushed + * @return CompareFunctionsProcessor instance which contains converted function call + */ public static CompareFunctionsProcessor process(FunctionCall call) { + return processWithEvaluator(call, new CompareFunctionsProcessor(call.getName())); + } + + /** + * Converts specified function call to be pushed into maprDB JSON scan. + * For the case when timestamp value is used, it is converted to UTC timezone + * before converting to {@link OTimestamp} instance. + * + * @param call function call to be pushed + * @return CompareFunctionsProcessor instance which contains converted function call + */ + public static CompareFunctionsProcessor processWithTimeZoneOffset(FunctionCall call) { + CompareFunctionsProcessor processor = new CompareFunctionsProcessor(call.getName()) { + @Override + protected boolean visitTimestampExpr(SchemaPath path, TimeStampExpression valueArg) { + // converts timestamp value from local time zone to UTC since the record reader + // reads the timestamp in local timezone if the readTimestampWithZoneOffset flag is enabled + long timeStamp = valueArg.getTimeStamp() - DateUtility.TIMEZONE_OFFSET_MILLIS; + this.value = KeyValueBuilder.initFrom(new OTimestamp(timeStamp)); + this.path = path; + return true; + } + }; + return processWithEvaluator(call, processor); + } + + private static CompareFunctionsProcessor processWithEvaluator(FunctionCall call, CompareFunctionsProcessor evaluator) { String functionName = call.getName(); LogicalExpression nameArg = call.args.get(0); - LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : null; - CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName); + LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null; - //if (valueArg != null) { + if (valueArg != null) { if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) { LogicalExpression swapArg = valueArg; valueArg = nameArg; @@ -83,7 +115,7 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName); } evaluator.success = nameArg.accept(evaluator, valueArg); - //} + } return evaluator; } @@ -187,14 +219,17 @@ class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpr } if (valueArg instanceof TimeStampExpression) { - this.value = KeyValueBuilder.initFrom(new OTimestamp(((TimeStampExpression)valueArg).getTimeStamp())); - this.path = path; - return true; + return visitTimestampExpr(path, (TimeStampExpression) valueArg); } - return false; } + protected boolean visitTimestampExpr(SchemaPath path, TimeStampExpression valueArg) { + this.value = KeyValueBuilder.initFrom(new OTimestamp(valueArg.getTimeStamp())); + this.path = path; + return true; + } + private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES; static { ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder(); diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java index 92d40f789..252bc538e 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonConditionBuilder.java @@ -74,7 +74,12 @@ public class JsonConditionBuilder extends AbstractExprVisitor<JsonScanSpec, Void ImmutableList<LogicalExpression> args = call.args; if (CompareFunctionsProcessor.isCompareFunction(functionName)) { - CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call); + CompareFunctionsProcessor processor; + if (groupScan.getFormatPlugin().getConfig().isReadTimestampWithZoneOffset()) { + processor = CompareFunctionsProcessor.processWithTimeZoneOffset(call); + } else { + processor = CompareFunctionsProcessor.process(call); + } if (processor.isSuccess()) { nodeScanSpec = createJsonScanSpec(call, processor); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index f13d64d9e..081d8fde1 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -33,6 +33,7 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; @@ -43,6 +44,7 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; import org.apache.drill.exec.util.EncodedSchemaPathSet; import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils; +import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.hadoop.fs.Path; import org.ojai.Document; @@ -60,8 +62,9 @@ import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; -import org.apache.drill.shaded.guava.com.google.common.base.Predicate; +import java.time.Instant; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -112,6 +115,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final boolean unionEnabled; private final boolean readNumbersAsDouble; + private final boolean readTimestampWithZoneOffset; private boolean disablePushdown; private final boolean allTextMode; private final boolean ignoreSchemaChange; @@ -156,6 +160,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { setColumns(projectedColumns); unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); readNumbersAsDouble = formatPlugin.getConfig().isReadAllNumbersAsDouble(); + readTimestampWithZoneOffset = formatPlugin.getConfig().isReadTimestampWithZoneOffset(); allTextMode = formatPlugin.getConfig().isAllTextMode(); ignoreSchemaChange = formatPlugin.getConfig().isIgnoreSchemaChange(); disablePushdown = !formatPlugin.getConfig().isEnablePushdown(); @@ -284,16 +289,53 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { throw new ExecutionSetupException(ex); } } - /* - * Setup the valueWriter and documentWriters based on config options + + /** + * Setup the valueWriter and documentWriters based on config options. */ private void setupWriter() { if (allTextMode) { - valueWriter = new AllTextValueWriter(buffer); + if (readTimestampWithZoneOffset) { + valueWriter = new AllTextValueWriter(buffer) { + /** + * Applies local time zone offset to timestamp value read using specified {@code reader}. + * + * @param writer writer to store string representation of timestamp value + * @param fieldName name of the field + * @param reader document reader + */ + @Override + protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) { + String formattedTimestamp = Instant.ofEpochMilli(reader.getTimestampLong()) + .atOffset(OffsetDateTime.now().getOffset()).format(DateUtility.UTC_FORMATTER); + writeString(writer, fieldName, formattedTimestamp); + } + }; + } else { + valueWriter = new AllTextValueWriter(buffer); + } } else if (readNumbersAsDouble) { - valueWriter = new NumbersAsDoubleValueWriter(buffer); + if (readTimestampWithZoneOffset) { + valueWriter = new NumbersAsDoubleValueWriter(buffer) { + @Override + protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) { + writeTimestampWithLocalZoneOffset(writer, fieldName, reader); + } + }; + } else { + valueWriter = new NumbersAsDoubleValueWriter(buffer); + } } else { - valueWriter = new OjaiValueWriter(buffer); + if (readTimestampWithZoneOffset) { + valueWriter = new OjaiValueWriter(buffer) { + @Override + protected void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) { + writeTimestampWithLocalZoneOffset(writer, fieldName, reader); + } + }; + } else { + valueWriter = new OjaiValueWriter(buffer); + } } if (projectWholeDocument) { @@ -307,6 +349,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } } + /** + * Applies local time zone offset to timestamp value read using specified {@code reader}. + * + * @param writer writer to store timestamp value + * @param fieldName name of the field + * @param reader document reader + */ + private void writeTimestampWithLocalZoneOffset(MapOrListWriterImpl writer, String fieldName, DocumentReader reader) { + long timestamp = reader.getTimestampLong() + DateUtility.TIMEZONE_OFFSET_MILLIS; + writer.timeStamp(fieldName).writeTimeStamp(timestamp); + } + @Override public int next() { Stopwatch watch = Stopwatch.createUnstarted(); @@ -387,7 +441,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } if (nonExistentColumnsProjection && recordCount > 0) { - JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.EMPTY_LIST); + JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(), allTextMode, Collections.emptyList()); } vectorWriter.setValueCount(recordCount); if (maxRecordsToRead > 0) { @@ -463,12 +517,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } public static boolean includesIdField(Collection<FieldPath> projected) { - return Iterables.tryFind(projected, new Predicate<FieldPath>() { - @Override - public boolean apply(FieldPath path) { - return Preconditions.checkNotNull(path).equals(ID_FIELD); - } - }).isPresent(); + return Iterables.tryFind(projected, path -> Preconditions.checkNotNull(path).equals(ID_FIELD)).isPresent(); } @Override diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java index e521c64d0..efb70aa1b 100644 --- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java +++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestScanRanges.java @@ -82,8 +82,8 @@ public class TestScanRanges extends BaseJsonTest { table.flush(); DBTests.waitForRowCount(table.getPath(), TOTAL_ROW_COUNT); - setSessionOption("planner.width.max_per_node", "5"); - } + setSessionOption("planner.width.max_per_node", 5); + } } @AfterClass diff --git a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java index b8c267589..7aca59d58 100644 --- a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java +++ b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java @@ -23,8 +23,11 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.hive.HiveMetadataProvider; import org.apache.drill.exec.store.hive.HiveReadEntry; @@ -75,6 +78,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi public void onMatch(RelOptRuleCall call) { try { DrillScanRel hiveScanRel = call.rel(0); + PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan(); HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry(); @@ -90,7 +94,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi "partitions"); } - DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel); + DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel, settings); call.transformTo(nativeScanRel); /* @@ -110,7 +114,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi /** * Helper method which creates a DrillScanRel with native Drill HiveScan. */ - private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) { + private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel, PlannerSettings settings) { RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory(); HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan(); Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters(); @@ -129,6 +133,9 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi hiveScanCols ); + nativeMapRDBScan.getFormatPlugin().getConfig().readTimestampWithZoneOffset = + settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET); + List<String> nativeScanColNames = hiveScanRel.getRowType().getFieldList().stream() .map(field -> replaceOverriddenColumnId(parameters, field.getName())) .collect(Collectors.toList()); |