diff options
Diffstat (limited to 'contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java')
-rwxr-xr-x | contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java | 51 |
1 files changed, 32 insertions, 19 deletions
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index cd732a61c..011c9bc58 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -28,12 +28,12 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.util.Calendar; +import java.util.List; import java.util.TimeZone; import javax.sql.DataSource; import org.apache.drill.common.AutoCloseables; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -63,7 +63,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; @SuppressWarnings("unchecked") class JdbcRecordReader extends AbstractRecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory .getLogger(JdbcRecordReader.class); private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS; @@ -75,11 +75,13 @@ class JdbcRecordReader extends AbstractRecordReader { private final String sql; private ImmutableList<ValueVector> vectors; private ImmutableList<Copier<?>> copiers; + private final List<String> columns; - public JdbcRecordReader(DataSource source, String sql, String storagePluginName) { + public JdbcRecordReader(DataSource source, String sql, String storagePluginName, List<String> columns) { this.source = source; this.sql = sql; this.storagePluginName = storagePluginName; + this.columns = columns; } static { @@ -180,22 +182,35 @@ class JdbcRecordReader extends AbstractRecordReader { } @Override - public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { + public void setup(OperatorContext operatorContext, OutputMutator output) { try { connection = source.getConnection(); statement = connection.createStatement(); resultSet = statement.executeQuery(sql); - final ResultSetMetaData meta = resultSet.getMetaData(); - final int columns = meta.getColumnCount(); + ResultSetMetaData meta = resultSet.getMetaData(); + int columnsCount = meta.getColumnCount(); + if (columns.size() != columnsCount) { + throw UserException + .validationError() + .message( + "Expected columns count differs from the returned one.\n" + + "Expected columns: %s\n" + + "Returned columns count: %s", + columns, columnsCount) + .addContext("sql", sql) + .addContext("plugin", storagePluginName) + .build(logger); + } ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder(); ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder(); - for (int i = 1; i <= columns; i++) { - final String name = meta.getColumnLabel(i); - final int jdbcType = meta.getColumnType(i); - final int width = meta.getPrecision(i); - final int scale = meta.getScale(i); + for (int i = 1; i <= columnsCount; i++) { + String name = columns.get(i - 1); + // column index in ResultSetMetaData starts from 1 + int jdbcType = meta.getColumnType(i); + int width = meta.getPrecision(i); + int scale = meta.getScale(i); MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType); if (minorType == null) { @@ -214,14 +229,14 @@ class JdbcRecordReader extends AbstractRecordReader { continue; } - final MajorType type = MajorType.newBuilder() + MajorType type = MajorType.newBuilder() .setMode(TypeProtos.DataMode.OPTIONAL) .setMinorType(minorType) .setScale(scale) .setPrecision(width) .build(); - final MaterializedField field = MaterializedField.create(name, type); - final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( + MaterializedField field = MaterializedField.create(name, type); + Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( minorType, type.getMode()); ValueVector vector = output.addField(field, clazz); vectorBuilder.add(vector); @@ -245,13 +260,11 @@ class JdbcRecordReader extends AbstractRecordReader { @Override public int next() { int counter = 0; - Boolean b = true; try { - while (counter < 4095 && b) { // loop at 4095 since nullables use one more than record count and we + while (counter < 4095) { // loop at 4095 since nullables use one more than record count and we // allocate on powers of two. - b = resultSet.next(); - if (!b) { - break; + if (!resultSet.next()) { + break; } for (Copier<?> c : copiers) { c.copy(counter); |