aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVolodymyr Vysotskyi <vvovyk@gmail.com>2019-02-17 23:10:04 +0200
committerGautam Parai <gparai@apache.org>2019-02-22 23:33:03 -0800
commit110c3578aa18f596278e251d700c8fa9ada8b0c4 (patch)
treef36d4aa11d5faf862b8d276b7651a8d181dcb304
parenta43839e2147c24700f8a331c6863566abed7a51e (diff)
DRILL-6734: JDBC storage plugin returns null for fields without aliases
closes #1642 - Add output column names to JdbcRecordReader and use them for storing the results since column names in result set may differ when aliases aren't specified
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java3
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java28
-rw-r--r--contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java5
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java51
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java15
-rw-r--r--contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java4
-rw-r--r--contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java60
7 files changed, 125 insertions, 41 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
index 76c06e71e..9073f4dd8 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -35,7 +35,8 @@ public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
JdbcStoragePlugin plugin = config.getPlugin();
- RecordReader reader = new JdbcRecordReader(plugin.getSource(), config.getSql(), plugin.getName());
+ RecordReader reader = new JdbcRecordReader(plugin.getSource(),
+ config.getSql(), plugin.getName(), config.getColumns());
return new ScanBatch(config, context, Collections.singletonList(reader));
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
index ed6b674c4..a98193939 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
@@ -39,35 +38,39 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class JdbcGroupScan extends AbstractGroupScan {
private final String sql;
+ private final List<String> columns;
private final JdbcStoragePlugin plugin;
private final double rows;
@JsonCreator
public JdbcGroupScan(
@JsonProperty("sql") String sql,
+ @JsonProperty("columns") List<String> columns,
@JsonProperty("config") StoragePluginConfig config,
@JsonProperty("rows") double rows,
@JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
super("");
this.sql = sql;
+ this.columns = columns;
this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
this.rows = rows;
}
- JdbcGroupScan(String sql, JdbcStoragePlugin plugin, double rows) {
+ JdbcGroupScan(String sql, List<String> columns, JdbcStoragePlugin plugin, double rows) {
super("");
this.sql = sql;
+ this.columns = columns;
this.plugin = plugin;
this.rows = rows;
}
@Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
}
@Override
- public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
- return new JdbcSubScan(sql, plugin);
+ public SubScan getSpecificScan(int minorFragmentId) {
+ return new JdbcSubScan(sql, columns, plugin);
}
@Override
@@ -88,9 +91,13 @@ public class JdbcGroupScan extends AbstractGroupScan {
return sql;
}
+ public List<String> getColumns() {
+ return columns;
+ }
+
@Override
public String getDigest() {
- return sql + String.valueOf(plugin.getConfig());
+ return sql + plugin.getConfig();
}
public StoragePluginConfig getConfig() {
@@ -98,10 +105,7 @@ public class JdbcGroupScan extends AbstractGroupScan {
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
- return new JdbcGroupScan(sql, plugin, rows);
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new JdbcGroupScan(sql, columns, plugin, rows);
}
-
-
-
-} \ No newline at end of file
+}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index 8221367e8..b8229402b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.jdbc;
-import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -91,8 +90,8 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
}
@Override
- public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- JdbcGroupScan output = new JdbcGroupScan(sql, convention.getPlugin(), rows);
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) {
+ JdbcGroupScan output = new JdbcGroupScan(sql, rowType.getFieldNames(), convention.getPlugin(), rows);
return creator.addMetadata(this, output);
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index cd732a61c..011c9bc58 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -28,12 +28,12 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
+import java.util.List;
import java.util.TimeZone;
import javax.sql.DataSource;
import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -63,7 +63,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
@SuppressWarnings("unchecked")
class JdbcRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(JdbcRecordReader.class);
private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
@@ -75,11 +75,13 @@ class JdbcRecordReader extends AbstractRecordReader {
private final String sql;
private ImmutableList<ValueVector> vectors;
private ImmutableList<Copier<?>> copiers;
+ private final List<String> columns;
- public JdbcRecordReader(DataSource source, String sql, String storagePluginName) {
+ public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<String> columns) {
this.source = source;
this.sql = sql;
this.storagePluginName = storagePluginName;
+ this.columns = columns;
}
static {
@@ -180,22 +182,35 @@ class JdbcRecordReader extends AbstractRecordReader {
}
@Override
- public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext operatorContext, OutputMutator output) {
try {
connection = source.getConnection();
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
- final ResultSetMetaData meta = resultSet.getMetaData();
- final int columns = meta.getColumnCount();
+ ResultSetMetaData meta = resultSet.getMetaData();
+ int columnsCount = meta.getColumnCount();
+ if (columns.size() != columnsCount) {
+ throw UserException
+ .validationError()
+ .message(
+ "Expected columns count differs from the returned one.\n" +
+ "Expected columns: %s\n" +
+ "Returned columns count: %s",
+ columns, columnsCount)
+ .addContext("sql", sql)
+ .addContext("plugin", storagePluginName)
+ .build(logger);
+ }
ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
- for (int i = 1; i <= columns; i++) {
- final String name = meta.getColumnLabel(i);
- final int jdbcType = meta.getColumnType(i);
- final int width = meta.getPrecision(i);
- final int scale = meta.getScale(i);
+ for (int i = 1; i <= columnsCount; i++) {
+ String name = columns.get(i - 1);
+ // column index in ResultSetMetaData starts from 1
+ int jdbcType = meta.getColumnType(i);
+ int width = meta.getPrecision(i);
+ int scale = meta.getScale(i);
MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
if (minorType == null) {
@@ -214,14 +229,14 @@ class JdbcRecordReader extends AbstractRecordReader {
continue;
}
- final MajorType type = MajorType.newBuilder()
+ MajorType type = MajorType.newBuilder()
.setMode(TypeProtos.DataMode.OPTIONAL)
.setMinorType(minorType)
.setScale(scale)
.setPrecision(width)
.build();
- final MaterializedField field = MaterializedField.create(name, type);
- final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+ MaterializedField field = MaterializedField.create(name, type);
+ Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
minorType, type.getMode());
ValueVector vector = output.addField(field, clazz);
vectorBuilder.add(vector);
@@ -245,13 +260,11 @@ class JdbcRecordReader extends AbstractRecordReader {
@Override
public int next() {
int counter = 0;
- Boolean b = true;
try {
- while (counter < 4095 && b) { // loop at 4095 since nullables use one more than record count and we
+ while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we
// allocate on powers of two.
- b = resultSet.next();
- if (!b) {
- break;
+ if (!resultSet.next()) {
+ break;
}
for (Copier<?> c : copiers) {
c.copy(counter);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
index 34d18273e..9bc6de891 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -29,25 +29,31 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.util.List;
+
@JsonTypeName("jdbc-sub-scan")
public class JdbcSubScan extends AbstractSubScan {
private final String sql;
private final JdbcStoragePlugin plugin;
+ private final List<String> columns;
@JsonCreator
public JdbcSubScan(
@JsonProperty("sql") String sql,
+ @JsonProperty("columns") List<String> columns,
@JsonProperty("config") StoragePluginConfig config,
@JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
super("");
this.sql = sql;
+ this.columns = columns;
this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
}
- JdbcSubScan(String sql, JdbcStoragePlugin plugin) {
+ JdbcSubScan(String sql, List<String> columns, JdbcStoragePlugin plugin) {
super("");
this.sql = sql;
+ this.columns = columns;
this.plugin = plugin;
}
@@ -60,6 +66,10 @@ public class JdbcSubScan extends AbstractSubScan {
return sql;
}
+ public List<String> getColumns() {
+ return columns;
+ }
+
public StoragePluginConfig getConfig() {
return plugin.getConfig();
}
@@ -68,5 +78,4 @@ public class JdbcSubScan extends AbstractSubScan {
public JdbcStoragePlugin getPlugin() {
return plugin;
}
-
-} \ No newline at end of file
+}
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
index a40eec2fa..1da9cf001 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithH2IT.java
@@ -63,7 +63,9 @@ public class TestJdbcPluginWithH2IT extends ClusterTest {
URL scriptFile = TestJdbcPluginWithH2IT.class.getClassLoader().getResource("h2-test-data.sql");
Assert.assertNotNull("Script for test tables generation 'h2-test-data.sql' " +
"cannot be found in test resources", scriptFile);
- RunScript.execute(connection, new FileReader(scriptFile.getFile()));
+ try (FileReader fileReader = new FileReader(scriptFile.getFile())) {
+ RunScript.execute(connection, fileReader);
+ }
}
startCluster(ClusterFixture.builder(dirTestWatcher));
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index 447e76a38..049ee6022 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -189,7 +189,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
null,
null,
null, "XXX")
- .go();
+ .go();
}
@Test
@@ -204,7 +204,7 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
@Test
public void pushdownJoinAndFilterPushDown() throws Exception {
- final String query = "select * from " +
+ String query = "select * from " +
"mysql.`drill_mysql_test`.person e " +
"INNER JOIN " +
"mysql.`drill_mysql_test`.person s " +
@@ -252,4 +252,60 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
assertEquals(1, queryBuilder().sql("describe caseSensitiveTable").run().recordCount());
assertEquals(2, queryBuilder().sql("describe CASESENSITIVETABLE").run().recordCount());
}
+
+ @Test // DRILL-6734
+ public void testExpressionsWithoutAlias() throws Exception {
+ String query = "select count(*), 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2\n" +
+ "from mysql.`drill_mysql_test`.person";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2")
+ .baselineValues(4L, 88L, 1.618033988749895)
+ .go();
+ }
+
+ @Test // DRILL-6734
+ public void testExpressionsWithoutAliasesPermutations() throws Exception {
+ String query = "select EXPR$1, EXPR$0, EXPR$2\n" +
+ "from (select 1+1+2+3+5+8+13+21+34, (1+sqrt(5))/2, count(*) from mysql.`drill_mysql_test`.person)";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("EXPR$1", "EXPR$0", "EXPR$2")
+ .baselineValues(1.618033988749895, 88L, 4L)
+ .go();
+ }
+
+ @Test // DRILL-6734
+ public void testExpressionsWithAliases() throws Exception {
+ String query = "select person_id as ID, 1+1+2+3+5+8+13+21+34 as FIBONACCI_SUM, (1+sqrt(5))/2 as golden_ratio\n" +
+ "from mysql.`drill_mysql_test`.person limit 2";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("ID", "FIBONACCI_SUM", "golden_ratio")
+ .baselineValues(1, 88L, 1.618033988749895)
+ .baselineValues(2, 88L, 1.618033988749895)
+ .go();
+ }
+
+ @Test // DRILL-6893
+ public void testJoinStar() throws Exception {
+ String query = "select * from (select person_id from mysql.`drill_mysql_test`.person) t1 join " +
+ "(select person_id from mysql.`drill_mysql_test`.person) t2 on t1.person_id = t2.person_id";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("person_id", "person_id0")
+ .baselineValues(1, 1)
+ .baselineValues(2, 2)
+ .baselineValues(3, 3)
+ .baselineValues(5, 5)
+ .go();
+ }
}