aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java')
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java51
1 files changed, 32 insertions, 19 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index cd732a61c..011c9bc58 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -28,12 +28,12 @@ import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
+import java.util.List;
import java.util.TimeZone;
import javax.sql.DataSource;
import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -63,7 +63,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
@SuppressWarnings("unchecked")
class JdbcRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(JdbcRecordReader.class);
private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
@@ -75,11 +75,13 @@ class JdbcRecordReader extends AbstractRecordReader {
private final String sql;
private ImmutableList<ValueVector> vectors;
private ImmutableList<Copier<?>> copiers;
+ private final List<String> columns;
- public JdbcRecordReader(DataSource source, String sql, String storagePluginName) {
+ public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<String> columns) {
this.source = source;
this.sql = sql;
this.storagePluginName = storagePluginName;
+ this.columns = columns;
}
static {
@@ -180,22 +182,35 @@ class JdbcRecordReader extends AbstractRecordReader {
}
@Override
- public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext operatorContext, OutputMutator output) {
try {
connection = source.getConnection();
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
- final ResultSetMetaData meta = resultSet.getMetaData();
- final int columns = meta.getColumnCount();
+ ResultSetMetaData meta = resultSet.getMetaData();
+ int columnsCount = meta.getColumnCount();
+ if (columns.size() != columnsCount) {
+ throw UserException
+ .validationError()
+ .message(
+ "Expected columns count differs from the returned one.\n" +
+ "Expected columns: %s\n" +
+ "Returned columns count: %s",
+ columns, columnsCount)
+ .addContext("sql", sql)
+ .addContext("plugin", storagePluginName)
+ .build(logger);
+ }
ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
- for (int i = 1; i <= columns; i++) {
- final String name = meta.getColumnLabel(i);
- final int jdbcType = meta.getColumnType(i);
- final int width = meta.getPrecision(i);
- final int scale = meta.getScale(i);
+ for (int i = 1; i <= columnsCount; i++) {
+ String name = columns.get(i - 1);
+ // column index in ResultSetMetaData starts from 1
+ int jdbcType = meta.getColumnType(i);
+ int width = meta.getPrecision(i);
+ int scale = meta.getScale(i);
MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
if (minorType == null) {
@@ -214,14 +229,14 @@ class JdbcRecordReader extends AbstractRecordReader {
continue;
}
- final MajorType type = MajorType.newBuilder()
+ MajorType type = MajorType.newBuilder()
.setMode(TypeProtos.DataMode.OPTIONAL)
.setMinorType(minorType)
.setScale(scale)
.setPrecision(width)
.build();
- final MaterializedField field = MaterializedField.create(name, type);
- final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+ MaterializedField field = MaterializedField.create(name, type);
+ Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
minorType, type.getMode());
ValueVector vector = output.addField(field, clazz);
vectorBuilder.add(vector);
@@ -245,13 +260,11 @@ class JdbcRecordReader extends AbstractRecordReader {
@Override
public int next() {
int counter = 0;
- Boolean b = true;
try {
- while (counter < 4095 && b) { // loop at 4095 since nullables use one more than record count and we
+ while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we
// allocate on powers of two.
- b = resultSet.next();
- if (!b) {
- break;
+ if (!resultSet.next()) {
+ break;
}
for (Copier<?> c : copiers) {
c.copy(counter);