diff options
Diffstat (limited to 'exec/java-exec/src/main')
27 files changed, 1986 insertions, 81 deletions
diff --git a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4 b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4 new file mode 100644 index 000000000..99426d97a --- /dev/null +++ b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaLexer.g4 @@ -0,0 +1,90 @@ +lexer grammar SchemaLexer; + +@header { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +} + +// data types +// https://drill.apache.org/docs/supported-data-types/ +INT: 'INT'; +INTEGER: 'INTEGER'; +BIGINT: 'BIGINT'; + +FLOAT: 'FLOAT'; +DOUBLE: 'DOUBLE'; + +DEC: 'DEC'; +DECIMAL: 'DECIMAL'; +NUMERIC: 'NUMERIC'; + +BOOLEAN: 'BOOLEAN'; + +CHAR: 'CHAR'; +CHARACTER: 'CHARACTER'; +VARYING: 'VARYING'; +VARCHAR: 'VARCHAR'; +BINARY: 'BINARY'; +VARBINARY: 'VARBINARY'; + +TIME: 'TIME'; +DATE: 'DATE'; +TIMESTAMP: 'TIMESTAMP'; +INTERVAL: 'INTERVAL'; + +YEAR: 'YEAR'; +MONTH: 'MONTH'; +DAY: 'DAY'; +HOUR: 'HOUR'; +MINUTE: 'MINUTE'; +SECOND: 'SECOND'; + +MAP: 'MAP'; +ARRAY: 'ARRAY'; + +// symbols +COMMA: ','; +REVERSE_QUOTE: '`'; +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +LEFT_ANGLE_BRACKET: '<'; +RIGHT_ANGLE_BRACKET: '>'; + +NOT: 'NOT'; +NULL: 'NULL'; +AS: 'AS'; + +NUMBER: [1-9] DIGIT* | '0'; +fragment DIGIT: [0-9]; + +// identifiers + +// column name can start with any letter, dollar sign ($) or underscore (_), +// consequently can contain any letter, dollar sign ($), underscore (_) or digit +// if any other symbols are present, use QUOTED_ID +ID: ([A-Z$_]) ([A-Z$_] | DIGIT)*; + +// column name should be enclosed into backticks, can contain any symbols including space +// if contains backtick, it should be escaped with backslash (`a\\`b` -> a`b) +// if contains backslash, it should be escaped as well (`a\\\\b` -> a\b) +QUOTED_ID: REVERSE_QUOTE (~[`\\] | '\\' [`\\])* REVERSE_QUOTE; + +// skip +LINE_COMMENT: '//' ~[\r\n]* -> skip; +BLOCK_COMMENT: '/*' .*? '*/' -> skip; +SPACE: [ \n\t\r\u000C]+ -> skip; diff --git a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4 b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4 new file mode 100644 index 000000000..321e99f8a --- /dev/null +++ b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4 @@ -0,0 +1,72 @@ +parser grammar SchemaParser; + +options { + language=Java; + tokenVocab=SchemaLexer; +} + +@header { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +} + +schema: (columns | LEFT_PAREN columns RIGHT_PAREN) EOF; + +columns: column (COMMA column)*; + +column: (primitive_column | map_column | simple_array_column | complex_array_column); + +primitive_column: column_id simple_type nullability?; + +simple_array_column: column_id simple_array_type nullability?; + +map_column: column_id map_type nullability?; + +complex_array_column: column_id complex_array_type nullability?; + +column_id +: ID # id +| QUOTED_ID # quoted_id +; + +simple_type +: (INT | INTEGER) # int +| BIGINT # bigint +| FLOAT # float +| DOUBLE # double +| (DEC | DECIMAL | NUMERIC) (LEFT_PAREN NUMBER (COMMA NUMBER)? RIGHT_PAREN)? # decimal +| BOOLEAN # boolean +| (CHAR | VARCHAR | CHARACTER VARYING?) (LEFT_PAREN NUMBER RIGHT_PAREN)? # varchar +| (BINARY | VARBINARY) (LEFT_PAREN NUMBER RIGHT_PAREN)? # binary +| TIME (LEFT_PAREN NUMBER RIGHT_PAREN)? # time +| DATE # date +| TIMESTAMP (LEFT_PAREN NUMBER RIGHT_PAREN)? # timestamp +| INTERVAL (YEAR | MONTH) # interval_year +| INTERVAL (DAY | HOUR | MINUTE | SECOND) # interval_day +| INTERVAL # interval +; + +complex_type: (simple_array_type | complex_array_type); + +simple_array_type: ARRAY LEFT_ANGLE_BRACKET (simple_type | map_type) RIGHT_ANGLE_BRACKET; + +complex_array_type: ARRAY LEFT_ANGLE_BRACKET complex_type RIGHT_ANGLE_BRACKET; + +map_type: MAP LEFT_ANGLE_BRACKET columns RIGHT_ANGLE_BRACKET; + +nullability: NOT NULL; diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd index 5891fb77d..9da80af2b 100644 --- a/exec/java-exec/src/main/codegen/data/Parser.tdd +++ b/exec/java-exec/src/main/codegen/data/Parser.tdd @@ -36,7 +36,8 @@ "REFRESH", "METADATA", "IF", - "JAR" + "JAR", + "PROPERTIES" ] # List of methods for parsing custom SQL statements. @@ -46,11 +47,9 @@ "SqlDescribeSchema()" "SqlDescribeTable()", "SqlUseSchema()", - "SqlCreateOrReplaceView()", - "SqlDropView()", + "SqlCreateOrReplace()" + "SqlDrop()", "SqlShowFiles()", - "SqlCreateTable()", - "SqlDropTable()", "SqlRefreshMetadata()", "SqlCreateFunction()", "SqlDropFunction()" diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index 8cccf4d07..8afc8f855 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -179,27 +179,65 @@ SqlNodeList ParseRequiredFieldList(String relType) : } /** +* Rarses CREATE [OR REPLACE] command for VIEW, TABLE or SCHEMA. +*/ +SqlNode SqlCreateOrReplace() : +{ + SqlParserPos pos; + String createType = "SIMPLE"; + boolean isTemporary = false; +} +{ + <CREATE> { pos = getPos(); } + [ <OR> <REPLACE> { createType = "OR_REPLACE"; } ] + [ <TEMPORARY> { isTemporary = true; } ] + ( + <VIEW> + { + if (isTemporary) { + throw new ParseException("Create view statement does not allow <TEMPORARY> keyword."); + } + return SqlCreateView(pos, createType); + } + | + <TABLE> + { + if (createType == "OR_REPLACE") { + throw new ParseException("Create table statement does not allow <OR><REPLACE>."); + } + return SqlCreateTable(pos, isTemporary); + + } + | + <SCHEMA> + { + if (isTemporary) { + throw new ParseException("Create schema statement does not allow <TEMPORARY> keyword."); + } + return SqlCreateSchema(pos, createType); + } + ) +} + +/** * Parses a create view or replace existing view statement. - * CREATE { [OR REPLACE] VIEW | VIEW [IF NOT EXISTS] | VIEW } view_name [ (field1, field2 ...) ] AS select_statement + * after CREATE OR REPLACE VIEW statement which is handled in the SqlCreateOrReplace method. + * + * CREATE { [OR REPLACE] VIEW | VIEW [IF NOT EXISTS] | VIEW } view_name [ (field1, field2 ...) ] AS select_statement */ -SqlNode SqlCreateOrReplaceView() : +SqlNode SqlCreateView(SqlParserPos pos, String createType) : { - SqlParserPos pos; SqlIdentifier viewName; SqlNode query; SqlNodeList fieldList; - String createViewType = "SIMPLE"; } { - <CREATE> { pos = getPos(); } - [ <OR> <REPLACE> { createViewType = "OR_REPLACE"; } ] - <VIEW> [ <IF> <NOT> <EXISTS> { - if (createViewType == "OR_REPLACE") { + if (createType == "OR_REPLACE") { throw new ParseException("Create view statement cannot have both <OR REPLACE> and <IF NOT EXISTS> clause"); } - createViewType = "IF_NOT_EXISTS"; + createType = "IF_NOT_EXISTS"; } ] viewName = CompoundIdentifier() @@ -207,49 +245,28 @@ SqlNode SqlCreateOrReplaceView() : <AS> query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { - return new SqlCreateView(pos, viewName, fieldList, query, SqlLiteral.createCharString(createViewType, getPos())); - } -} - -/** - * Parses a drop view or drop view if exists statement. - * DROP VIEW [IF EXISTS] view_name; - */ -SqlNode SqlDropView() : -{ - SqlParserPos pos; - boolean viewExistenceCheck = false; -} -{ - <DROP> { pos = getPos(); } - <VIEW> - [ <IF> <EXISTS> { viewExistenceCheck = true; } ] - { - return new SqlDropView(pos, CompoundIdentifier(), viewExistenceCheck); + return new SqlCreateView(pos, viewName, fieldList, query, SqlLiteral.createCharString(createType, getPos())); } } /** - * Parses a CTAS or CTTAS statement. + * Parses a CTAS or CTTAS statement after CREATE [TEMPORARY] TABLE statement + * which is handled in the SqlCreateOrReplace method. + * * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] tblname [ (field1, field2, ...) ] AS select_statement. */ -SqlNode SqlCreateTable() : +SqlNode SqlCreateTable(SqlParserPos pos, boolean isTemporary) : { - SqlParserPos pos; SqlIdentifier tblName; SqlNodeList fieldList; SqlNodeList partitionFieldList; SqlNode query; - boolean isTemporary = false; boolean tableNonExistenceCheck = false; } { { partitionFieldList = SqlNodeList.EMPTY; } - <CREATE> { pos = getPos(); } - ( <TEMPORARY> { isTemporary = true; } )? - <TABLE> ( <IF> <NOT> <EXISTS> { tableNonExistenceCheck = true; } )? tblName = CompoundIdentifier() fieldList = ParseOptionalFieldList("Table") @@ -266,17 +283,159 @@ SqlNode SqlCreateTable() : } /** - * Parses a drop table or drop table if exists statement. - * DROP TABLE [IF EXISTS] table_name; +* Parses create table schema statement after CREATE OR REPLACE SCHEMA statement +* which is handled in the SqlCreateOrReplace method. +* +* CREATE [OR REPLACE] SCHEMA +* [ +* LOAD 'file:///path/to/raw_schema' +* | +* ( +* col1 int, +* col2 varchar(10) not null +* ) +* ] +* [FOR TABLE dfs.my_table] +* [PATH 'file:///path/to/schema'] +* [PROPERTIES ('prop1'='val1', 'prop2'='val2')] +*/ +SqlNode SqlCreateSchema(SqlParserPos pos, String createType) : +{ + SqlCharStringLiteral schema = null; + SqlNode load = null; + SqlIdentifier table = null; + SqlNode path = null; + SqlNodeList properties = null; +} +{ + { + token_source.pushState(); + token_source.SwitchTo(SCH); + } + ( + <LOAD> + { + load = StringLiteral(); + } + | + <PAREN_STRING> + { + schema = SqlLiteral.createCharString(token.image, getPos()); + } + ) + ( + <FOR> <TABLE> { table = CompoundIdentifier(); } + | + <PATH> + { + path = StringLiteral(); + if (createType == "OR_REPLACE") { + throw new ParseException("<OR REPLACE> cannot be used with <PATH> property."); + } + } + ) + [ + <PROPERTIES> <LPAREN> + { + properties = new SqlNodeList(getPos()); + addProperty(properties); + } + ( + <COMMA> + { addProperty(properties); } + )* + <RPAREN> + ] + { + return new SqlSchema.Create(pos, schema, load, table, path, properties, + SqlLiteral.createCharString(createType, getPos())); + } +} + +/** +* Helper method to add string literals divided by equals into SqlNodeList. +*/ +void addProperty(SqlNodeList properties) : +{} +{ + { properties.add(StringLiteral()); } + <EQ> + { properties.add(StringLiteral()); } +} + +<SCH> SKIP : +{ + " " +| "\t" +| "\n" +| "\r" +} + +<SCH> TOKEN : { + < LOAD: "LOAD" > { popState(); } + | < NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* > + // once schema is found, swich back to initial lexical state + // must be enclosed in the parentheses + // inside may have left parenthesis only if number precededs (covers cases with varchar(10)), + // if left parenthesis is present in column name, it must be escaped with backslash + | < PAREN_STRING: <LPAREN> ((~[")"]) | (<NUM> ")") | ("\\)"))+ <RPAREN> > { popState(); } +} + +/** + * Parses DROP command for VIEW, TABLE and SCHEMA. */ -SqlNode SqlDropTable() : +SqlNode SqlDrop() : { SqlParserPos pos; - boolean tableExistenceCheck = false; } { <DROP> { pos = getPos(); } - <TABLE> + ( + <VIEW> + { + return SqlDropView(pos); + } + | + <TABLE> + { + return SqlDropTable(pos); + } + | + <SCHEMA> + { + return SqlDropSchema(pos); + } + ) +} + +/** + * Parses a drop view or drop view if exists statement + * after DROP VIEW statement which is handled in SqlDrop method. + * + * DROP VIEW [IF EXISTS] view_name; + */ +SqlNode SqlDropView(SqlParserPos pos) : +{ + boolean viewExistenceCheck = false; +} +{ + [ <IF> <EXISTS> { viewExistenceCheck = true; } ] + { + return new SqlDropView(pos, CompoundIdentifier(), viewExistenceCheck); + } +} + +/** + * Parses a drop table or drop table if exists statement + * after DROP TABLE statement which is handled in SqlDrop method. + * + * DROP TABLE [IF EXISTS] table_name; + */ +SqlNode SqlDropTable(SqlParserPos pos) : +{ + boolean tableExistenceCheck = false; +} +{ [ <IF> <EXISTS> { tableExistenceCheck = true; } ] { return new SqlDropTable(pos, CompoundIdentifier(), tableExistenceCheck); @@ -284,6 +443,26 @@ SqlNode SqlDropTable() : } /** +* Parses drop schema or drop schema if exists statement +* after DROP SCHEMA statement which is handled in SqlDrop method. +* +* DROP SCHEMA [IF EXISTS] +* FOR TABLE dfs.my_table +*/ +SqlNode SqlDropSchema(SqlParserPos pos) : +{ + SqlIdentifier table = null; + boolean existenceCheck = false; +} +{ + [ <IF> <EXISTS> { existenceCheck = true; } ] + <FOR> <TABLE> { table = CompoundIdentifier(); } + { + return new SqlSchema.Drop(pos, table, SqlLiteral.createBoolean(existenceCheck, getPos())); + } +} + +/** * Parse refresh table metadata statement. * REFRESH TABLE METADATA tblname */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index c92f5f830..3dfab6728 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -111,7 +111,7 @@ public class DrillSqlWorker { } catch (Exception e) { logger.trace("There was an error during conversion into physical plan. " + "Will sync remote and local function registries if needed and retry " + - "in case if issue was due to missing function implementation."); + "in case if issue was due to missing function implementation.", e); if (context.getFunctionRegistry().syncWithRemoteRegistry( context.getDrillOperatorTable().getFunctionRegistryVersion())) { context.reloadDrillOperatorTable(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java index b0479dbbb..e07fb1589 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java @@ -217,7 +217,7 @@ public class SchemaUtilites { final AbstractSchema drillSchema = unwrapAsDrillSchemaInstance(schema); if (!drillSchema.isMutable()) { throw UserException.validationError() - .message("Unable to create or drop tables/views. Schema [%s] is immutable.", getSchemaPath(schema)) + .message("Unable to create or drop objects. Schema [%s] is immutable.", getSchemaPath(schema)) .build(logger); } @@ -291,4 +291,21 @@ public class SchemaUtilites { } } + /** + * If table schema is not indicated in sql call, returns temporary workspace. + * If schema is indicated, resolves to mutable table schema. + * + * @param tableSchema table schema + * @param defaultSchema default schema + * @param config drill config + * @return resolved schema + */ + public static AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) { + if (tableSchema.size() == 0) { + return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config); + } else { + return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema); + } + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java index 4f1a7596e..665bddddd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java @@ -17,15 +17,12 @@ */ package org.apache.drill.exec.planner.sql.handlers; -import java.io.IOException; import java.util.List; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.PhysicalPlan; @@ -55,7 +52,7 @@ public class DropTableHandler extends DefaultSqlHandler { * raise exception otherwise */ @Override - public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException { + public PhysicalPlan getPlan(SqlNode sqlNode) { SqlDropTable dropTableNode = ((SqlDropTable) sqlNode); String originalTableName = FileSelection.removeLeadingSlash(dropTableNode.getName()); SchemaPlus defaultSchema = config.getConverter().getDefaultSchema(); @@ -63,7 +60,7 @@ public class DropTableHandler extends DefaultSqlHandler { DrillConfig drillConfig = context.getConfig(); UserSession session = context.getSession(); - AbstractSchema temporarySchema = resolveToTemporarySchema(tableSchema, defaultSchema, drillConfig); + AbstractSchema temporarySchema = SchemaUtilites.resolveToTemporarySchema(tableSchema, defaultSchema, drillConfig); boolean isTemporaryTable = session.isTemporaryTable(temporarySchema, drillConfig, originalTableName); if (isTemporaryTable) { @@ -86,21 +83,4 @@ public class DropTableHandler extends DefaultSqlHandler { return DirectPlan.createDirectPlan(context, true, message); } - /** - * If table schema is not indicated in sql call, returns temporary workspace. - * If schema is indicated, resolves to mutable table schema. - * - * @param tableSchema table schema - * @param defaultSchema default schema - * @param config drill config - * @return resolved schema - */ - private AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) { - if (tableSchema.size() == 0) { - return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config); - } else { - return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema); - } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java new file mode 100644 index 000000000..488268318 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlNode; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.SchemaUtilites; +import org.apache.drill.exec.planner.sql.parser.SqlCreateType; +import org.apache.drill.exec.planner.sql.parser.SqlSchema; +import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider; +import org.apache.drill.exec.record.metadata.schema.PathSchemaProvider; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +/** + * Parent class for CREATE / DROP SCHEMA handlers. + * Contains common logic on how extract workspace, output error result. + */ +public abstract class SchemaHandler extends DefaultSqlHandler { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaHandler.class); + + SchemaHandler(SqlHandlerConfig config) { + super(config); + } + + WorkspaceSchemaFactory.WorkspaceSchema getWorkspaceSchema(List<String> tableSchema, String tableName) { + SchemaPlus defaultSchema = config.getConverter().getDefaultSchema(); + AbstractSchema temporarySchema = SchemaUtilites.resolveToTemporarySchema(tableSchema, defaultSchema, context.getConfig()); + + if (context.getSession().isTemporaryTable(temporarySchema, context.getConfig(), tableName)) { + produceErrorResult(String.format("Indicated table [%s] is temporary table", tableName), true); + } + + AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema); + Table table = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName); + if (table == null || table.getJdbcTableType() != Schema.TableType.TABLE) { + produceErrorResult(String.format("Table [%s] was not found", tableName), true); + } + + if (!(drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) { + produceErrorResult(String.format("Table [`%s`.`%s`] must belong to file storage plugin", + drillSchema.getFullSchemaName(), tableName), true); + } + + Preconditions.checkState(drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema); + return (WorkspaceSchemaFactory.WorkspaceSchema) drillSchema; + } + + PhysicalPlan produceErrorResult(String message, boolean doFail) { + if (doFail) { + throw UserException.validationError().message(message).build(logger); + } else { + return DirectPlan.createDirectPlan(context, false, message); + } + } + + /** + * CREATE SCHEMA command handler. + */ + public static class Create extends SchemaHandler { + + public Create(SqlHandlerConfig config) { + super(config); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) { + + SqlSchema.Create sqlCall = ((SqlSchema.Create) sqlNode); + + String schemaString = getSchemaString(sqlCall); + String schemaSource = sqlCall.hasTable() ? sqlCall.getTable().toString() : sqlCall.getPath(); + try { + + SchemaProvider schemaProvider; + if (sqlCall.hasTable()) { + String tableName = sqlCall.getTableName(); + WorkspaceSchemaFactory.WorkspaceSchema wsSchema = getWorkspaceSchema(sqlCall.getSchemaPath(), tableName); + schemaProvider = new FsMetastoreSchemaProvider(wsSchema, tableName); + } else { + schemaProvider = new PathSchemaProvider(new Path(sqlCall.getPath())); + } + + if (schemaProvider.exists()) { + if (SqlCreateType.OR_REPLACE == sqlCall.getSqlCreateType()) { + schemaProvider.delete(); + } else { + return produceErrorResult(String.format("Schema already exists for [%s]", schemaSource), true); + } + } + + // schema file will be created with same permission as used for persistent tables + StorageStrategy storageStrategy = new StorageStrategy(context.getOption( + ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false); + schemaProvider.store(schemaString, sqlCall.getProperties(), storageStrategy); + return DirectPlan.createDirectPlan(context, true, String.format("Created schema for [%s]", schemaSource)); + + } catch (IOException e) { + throw UserException.resourceError(e) + .message(e.getMessage()) + .addContext("Error while preparing / creating schema for [%s]", schemaSource) + .build(logger); + } + } + + /** + * If raw schema was present in create schema command, returns schema from command, + * otherwise loads raw schema from the given file. + * + * @param sqlCall sql create schema call + * @return string representation of raw schema (column names, types and nullability) + */ + private String getSchemaString(SqlSchema.Create sqlCall) { + if (sqlCall.hasSchema()) { + return sqlCall.getSchema(); + } + + Path path = new Path(sqlCall.getLoad()); + try { + FileSystem rawFs = path.getFileSystem(new Configuration()); + FileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), rawFs.getConf()); + + if (!fs.exists(path)) { + throw UserException.resourceError() + .message("File with raw schema [%s] does not exist", path.toUri().getPath()) + .build(logger); + } + + try (InputStream stream = fs.open(path)) { + return IOUtils.toString(stream); + } + + } catch (IOException e) { + throw UserException.resourceError(e) + .message("Unable to load raw schema from file %s", path.toUri().getPath()) + .build(logger); + } + } + } + + /** + * DROP SCHEMA command handler. + */ + public static class Drop extends SchemaHandler { + + public Drop(SqlHandlerConfig config) { + super(config); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) { + SqlSchema.Drop sqlCall = ((SqlSchema.Drop) sqlNode); + + String tableName = sqlCall.getTableName(); + WorkspaceSchemaFactory.WorkspaceSchema wsSchema = getWorkspaceSchema(sqlCall.getSchemaPath(), tableName); + + try { + + SchemaProvider schemaProvider = new FsMetastoreSchemaProvider(wsSchema, tableName); + + if (!schemaProvider.exists()) { + return produceErrorResult(String.format("Schema [%s] does not exist in table [%s] root directory", + SchemaProvider.DEFAULT_SCHEMA_NAME, sqlCall.getTable()), !sqlCall.ifExists()); + } + + schemaProvider.delete(); + + return DirectPlan.createDirectPlan(context, true, + String.format("Dropped schema for table [%s]", sqlCall.getTable())); + + } catch (IOException e) { + throw UserException.resourceError(e) + .message(e.getMessage()) + .addContext("Error while accessing table location or deleting schema for [%s]", sqlCall.getTable()) + .build(logger); + } + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java index 012315f05..f5b690a1e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java @@ -114,7 +114,7 @@ public abstract class ViewHandler extends DefaultSqlHandler { || context.getSession().isTemporaryTable(drillSchema, context.getConfig(), viewName); final boolean isView = (table != null && table.getJdbcTableType() == Schema.TableType.VIEW); - switch (view.getcreateViewType()) { + switch (view.getSqlCreateType()) { case SIMPLE: if (isTable) { throw UserException @@ -154,7 +154,7 @@ public abstract class ViewHandler extends DefaultSqlHandler { } @Override - public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { + public PhysicalPlan getPlan(SqlNode sqlNode) throws IOException, ForemanSetupException { SqlDropView dropView = unwrap(sqlNode, SqlDropView.class); final String viewName = FileSelection.removeLeadingSlash(dropView.getName()); final AbstractSchema drillSchema = diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java index ffae0ef54..57a7e1753 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java @@ -79,6 +79,8 @@ public class CompoundIdentifierConverter extends SqlShuttle { .put(SqlSetOption.class, arrayOf(D, D, D)) .put(SqlCreateFunction.class, arrayOf(D)) .put(SqlDropFunction.class, arrayOf(D)) + .put(SqlSchema.Create.class, arrayOf(D, D, D, D, D, D)) + .put(SqlSchema.Drop.class, arrayOf(D, D)) .build(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateType.java new file mode 100644 index 000000000..d0ac25da8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +/** + * Enum which indicates type of CREATE statement. + */ +public enum SqlCreateType { + + /** + * Attempts to execute CREATE command without checking if object to be created exists. + * Will fail if object to be created exists. + */ + SIMPLE, + + /** + * Before CREATE command execution checks if object to be created exists. + * If object to be created exists, will drop it and proceed execution. + */ + OR_REPLACE, + + /** + * Before CREATE command execution checks if object to be created exists. + * If object to be created exists, does nothing. + */ + IF_NOT_EXISTS +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java index f61aeaa39..e37669756 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java @@ -47,19 +47,15 @@ public class SqlCreateView extends DrillSqlCall { private SqlIdentifier viewName; private SqlNodeList fieldList; private SqlNode query; - private SqlLiteral createViewType; - - public enum SqlCreateViewType { - SIMPLE, OR_REPLACE, IF_NOT_EXISTS - } + private SqlLiteral createType; public SqlCreateView(SqlParserPos pos, SqlIdentifier viewName, SqlNodeList fieldList, - SqlNode query, SqlLiteral createViewType) { + SqlNode query, SqlLiteral createType) { super(pos); this.viewName = viewName; this.query = query; this.fieldList = fieldList; - this.createViewType = createViewType; + this.createType = createType; } @Override @@ -73,14 +69,14 @@ public class SqlCreateView extends DrillSqlCall { ops.add(viewName); ops.add(fieldList); ops.add(query); - ops.add(createViewType); + ops.add(createType); return ops; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("CREATE"); - switch (SqlCreateViewType.valueOf(createViewType.toValue())) { + switch (SqlCreateType.valueOf(createType.toValue())) { case SIMPLE: writer.keyword("VIEW"); break; @@ -135,6 +131,6 @@ public class SqlCreateView extends DrillSqlCall { public SqlNode getQuery() { return query; } - public SqlCreateViewType getcreateViewType() { return SqlCreateViewType.valueOf(createViewType.toValue()); } + public SqlCreateType getSqlCreateType() { return SqlCreateType.valueOf(createType.toValue()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java new file mode 100644 index 000000000..7985279a6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.apache.drill.exec.planner.sql.handlers.SchemaHandler; +import org.apache.drill.exec.store.dfs.FileSelection; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Parent class for CREATE and DROP SCHEMA commands. + * Holds logic common command property: table. + */ +public abstract class SqlSchema extends DrillSqlCall { + + protected final SqlIdentifier table; + + protected SqlSchema(SqlParserPos pos, SqlIdentifier table) { + super(pos); + this.table = table; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + if (table != null) { + writer.keyword("FOR TABLE"); + table.unparse(writer, leftPrec, rightPrec); + } + } + + public boolean hasTable() { + return table != null; + } + + public SqlIdentifier getTable() { + return table; + } + + public List<String> getSchemaPath() { + if (hasTable()) { + return table.isSimple() ? Collections.emptyList() : table.names.subList(0, table.names.size() - 1); + } + return null; + } + + public String getTableName() { + if (hasTable()) { + String tableName = table.isSimple() ? table.getSimple() : table.names.get(table.names.size() - 1); + return FileSelection.removeLeadingSlash(tableName); + } + return null; + } + + /** + * Visits literal and returns bare value (i.e. single quotes). + */ + private static class LiteralVisitor extends SqlBasicVisitor<String> { + + static final LiteralVisitor INSTANCE = new LiteralVisitor(); + + @Override + public String visit(SqlLiteral literal) { + return literal.toValue(); + } + + } + + /** + * CREATE SCHEMA sql call. + */ + public static class Create extends SqlSchema { + + private final SqlCharStringLiteral schema; + private final SqlNode load; + private final SqlNode path; + private final SqlNodeList properties; + private final SqlLiteral createType; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_SCHEMA", SqlKind.OTHER_DDL) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new Create(pos, (SqlCharStringLiteral) operands[0], operands[1], + (SqlIdentifier) operands[2], operands[3], (SqlNodeList) operands[4], (SqlLiteral) operands[5]); + } + }; + + public Create(SqlParserPos pos, + SqlCharStringLiteral schema, + SqlNode load, + SqlIdentifier table, + SqlNode path, + SqlNodeList properties, + SqlLiteral createType) { + super(pos, table); + this.schema = schema; + this.load = load; + this.path = path; + this.properties = properties; + this.createType = createType; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Arrays.asList(schema, load, table, path, properties, createType); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + + if (SqlCreateType.OR_REPLACE == getSqlCreateType()) { + writer.keyword("OR"); + writer.keyword("REPLACE"); + } + + writer.keyword("SCHEMA"); + writer.literal(getSchema()); + + super.unparse(writer, leftPrec, rightPrec); + + if (load != null) { + writer.keyword("LOAD"); + load.unparse(writer, leftPrec, rightPrec); + } + + if (path != null) { + writer.keyword("PATH"); + path.unparse(writer, leftPrec, rightPrec); + } + + if (properties != null) { + writer.keyword("PROPERTIES"); + writer.keyword("("); + + for (int i = 1; i < properties.size(); i += 2) { + if (i != 1) { + writer.keyword(","); + } + properties.get(i - 1).unparse(writer, leftPrec, rightPrec); + writer.keyword("="); + properties.get(i).unparse(writer, leftPrec, rightPrec); + } + + writer.keyword(")"); + } + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new SchemaHandler.Create(config); + } + + public boolean hasSchema() { + return schema != null; + } + + public String getSchema() { + return hasSchema() ? schema.toValue() : null; + } + + public String getLoad() { + return load == null ? null : load.accept(LiteralVisitor.INSTANCE); + } + + public String getPath() { + return path == null ? null : path.accept(LiteralVisitor.INSTANCE); + } + + public Map<String, String> getProperties() { + if (properties == null) { + return null; + } + + // preserve properties order + Map<String, String> map = new LinkedHashMap<>(); + for (int i = 1; i < properties.size(); i += 2) { + map.put(properties.get(i - 1).accept(LiteralVisitor.INSTANCE), + properties.get(i).accept(LiteralVisitor.INSTANCE)); + } + return map; + } + + public SqlCreateType getSqlCreateType() { + return SqlCreateType.valueOf(createType.toValue()); + } + + } + + /** + * DROP SCHEMA sql call. + */ + public static class Drop extends SqlSchema { + + private final SqlLiteral existenceCheck; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_SCHEMA", SqlKind.OTHER_DDL) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new Drop(pos, (SqlIdentifier) operands[0], (SqlLiteral) operands[1]); + } + }; + + public Drop(SqlParserPos pos, SqlIdentifier table, SqlLiteral existenceCheck) { + super(pos, table); + this.existenceCheck = existenceCheck; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Arrays.asList(table, existenceCheck); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); + writer.keyword("SCHEMA"); + + if (ifExists()) { + writer.keyword("IF"); + writer.keyword("EXISTS"); + } + + super.unparse(writer, leftPrec, rightPrec); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new SchemaHandler.Drop(config); + } + + public boolean ifExists() { + return existenceCheck.booleanValue(); + } + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java index 63dda0777..82a7ab574 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java @@ -220,4 +220,34 @@ public abstract class AbstractColumnMetadata implements ColumnMetadata { .append("]") .toString(); } + + @Override + public String typeString() { + return majorType().toString(); + } + + @Override + public String columnString() { + StringBuilder builder = new StringBuilder(); + builder.append("`").append(escapeSpecialSymbols(name())).append("`"); + builder.append(" "); + builder.append(typeString()); + + // Drill does not have nullability notion for complex types + if (!isNullable() && !isArray() && !isMap()) { + builder.append(" NOT NULL"); + } + + return builder.toString(); + } + + /** + * If given value contains backticks (`) or backslashes (\), escapes them. + * + * @param value string value + * @return updated value + */ + private String escapeSpecialSymbols(String value) { + return value.replaceAll("(\\\\)|(`)", "\\\\$0"); + } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java index 9db2e3e7e..8d295e69f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java @@ -118,4 +118,18 @@ public class MapColumnMetadata extends AbstractColumnMetadata { .setMode(mode) .build()); } + + @Override + public String typeString() { + StringBuilder builder = new StringBuilder(); + if (isArray()) { + builder.append("ARRAY<"); + } + builder.append("MAP<").append(mapSchema.schemaString()).append(">"); + if (isArray()) { + builder.append(">"); + } + return builder.toString(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java index ead6134cb..9781e1c99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/PrimitiveColumnMetadata.java @@ -180,4 +180,50 @@ public class PrimitiveColumnMetadata extends AbstractColumnMetadata { @Override public MaterializedField emptySchema() { return schema(); } + + @Override + public String typeString() { + StringBuilder builder = new StringBuilder(); + if (isArray()) { + builder.append("ARRAY<"); + } + + switch (type) { + case VARDECIMAL: + builder.append("DECIMAL"); + break; + case FLOAT4: + builder.append("FLOAT"); + break; + case FLOAT8: + builder.append("DOUBLE"); + break; + case BIT: + builder.append("BOOLEAN"); + break; + case INTERVALYEAR: + builder.append("INTERVAL YEAR"); + break; + case INTERVALDAY: + builder.append("INTERVAL DAY"); + break; + default: + // other minor types names correspond to SQL-like equivalents + builder.append(type.name()); + } + + if (precision() > 0) { + builder.append("(").append(precision()); + if (scale() > 0) { + builder.append(", ").append(scale()); + } + builder.append(")"); + } + + if (isArray()) { + builder.append(">"); + } + return builder.toString(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java index e677a0a23..b573151a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListColumnMetadata.java @@ -99,4 +99,10 @@ public class RepeatedListColumnMetadata extends AbstractColumnMetadata { return childSchema == null ? UNKNOWN_DIMENSIONS : childSchema.dimensions() + 1; } + + @Override + public String typeString() { + return "ARRAY<" + childSchema.typeString() + ">"; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java index 1196143a9..83dc91ac8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java @@ -183,6 +183,13 @@ public class TupleSchema implements TupleMetadata { public boolean isRoot() { return parentMap == null; } @Override + public String schemaString() { + return nameSpace.entries().stream() + .map(ColumnMetadata::columnString) + .collect(Collectors.joining(", ")); + } + + @Override public String toString() { StringBuilder builder = new StringBuilder() .append("[") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java new file mode 100644 index 000000000..ff3d1a7e3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Map; + +/** + * Is used to provide schema based on table location on file system + * and default schema file name {@link SchemaProvider#DEFAULT_SCHEMA_NAME}. + */ +public class FsMetastoreSchemaProvider extends PathSchemaProvider { + + private final String tableName; + + public FsMetastoreSchemaProvider(WorkspaceSchemaFactory.WorkspaceSchema wsSchema, String tableName) throws IOException { + super(wsSchema.getFS(), generatePath(wsSchema, tableName)); + this.tableName = String.format("%s.`%s`", wsSchema.getFullSchemaName(), tableName); + } + + private static Path generatePath(WorkspaceSchemaFactory.WorkspaceSchema wsSchema, String tableName) throws IOException { + Path tablePath = new Path(wsSchema.getDefaultLocation(), tableName); + if (wsSchema.getFS().isFile(tablePath)) { + throw new IOException(String.format("Indicated table [%s.%s] must be a directory", wsSchema.getFullSchemaName(), tableName)); + } + return new Path(tablePath, DEFAULT_SCHEMA_NAME); + } + + @Override + protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) { + return new SchemaContainer(tableName, schema, properties); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java new file mode 100644 index 000000000..8b766183c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import org.apache.drill.exec.store.StorageStrategy; + +import java.util.Map; + +/** + * Is used to provide schema when passed using table function. + */ +public class InlineSchemaProvider implements SchemaProvider { + + private final String schema; + private final Map<String, String> properties; + + public InlineSchemaProvider(String schema, Map<String, String> properties) { + this.schema = schema; + this.properties = properties; + } + + @Override + public void delete() { + throw new UnsupportedOperationException("Schema deletion is not supported"); + } + + @Override + public void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) { + throw new UnsupportedOperationException("Schema storage is not supported"); + } + + @Override + public SchemaContainer read() { + return new SchemaContainer(null, schema, properties); + } + + @Override + public boolean exists() { + return true; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java new file mode 100644 index 000000000..28754aadc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.util.DefaultIndenter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT; + +/** + * Is used to provide schema using given schema file name and path. + */ +public class PathSchemaProvider implements SchemaProvider { + + /** + * Reader used to read JSON schema from file into into {@link SchemaContainer}. + * Allows comment inside the JSON file. + */ + private static final ObjectReader READER; + + /** + * Writer used to write content from {@link SchemaContainer} into JSON file. + */ + private static final ObjectWriter WRITER; + + static { + ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT).configure(JsonParser.Feature.ALLOW_COMMENTS, true); + + READER = mapper.readerFor(SchemaContainer.class); + + DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter(); + prettyPrinter = prettyPrinter.withArrayIndenter(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE); + WRITER = mapper.writer(prettyPrinter); + } + + private final Path path; + private final FileSystem fs; + + public PathSchemaProvider(Path path) throws IOException { + this(createFsFromPath(path), path); + } + + public PathSchemaProvider(FileSystem fs, Path path) throws IOException { + this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); + + if (!fs.exists(path.getParent())) { + throw new IOException(String.format("Parent path for schema file [%s] does not exist", path.toUri().getPath())); + } + + this.path = path; + } + + private static FileSystem createFsFromPath(Path path) throws IOException { + return path.getFileSystem(new Configuration()); + } + + @Override + public void delete() throws IOException { + try { + if (!fs.delete(path, false)) { + throw new IOException(String.format("Error while deleting schema file [%s]", path.toUri().getPath())); + } + } catch (IOException e1) { + // re-check file existence to cover concurrent deletion case + try { + if (exists()) { + throw e1; + } + } catch (IOException e2) { + // ignore new exception and throw original one + throw e1; + } + } + } + + @Override + public void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) throws IOException { + SchemaContainer tableSchema = createTableSchema(schema, properties); + + try (OutputStream stream = fs.create(path, false)) { + WRITER.writeValue(stream, tableSchema); + } + storageStrategy.applyToFile(fs, path); + } + + @Override + public SchemaContainer read() throws IOException { + try (InputStream stream = fs.open(path)) { + return READER.readValue(stream); + } + } + + @Override + public boolean exists() throws IOException { + return fs.exists(path); + } + + protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) { + return new SchemaContainer(null, schema, properties); + } + +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java new file mode 100644 index 000000000..e705be2eb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Holder class that contains table name, schema definition + * and properties passed in schema file or using table function. + */ +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +public class SchemaContainer { + + private final String table; + private final TupleMetadata schema; + // preserve properties order + private final Map<String, String> properties = new LinkedHashMap<>(); + private final Version version; + + @JsonCreator + public SchemaContainer(@JsonProperty("table") String table, + @JsonProperty("schema") List<String> schema, + @JsonProperty("properties") LinkedHashMap<String, String> properties, + @JsonProperty("version") Integer version) { + this(table, schema == null ? null : String.join(", ", schema), properties, version); + } + + public SchemaContainer(String table, String schema, Map<String, String> properties) { + this(table, schema, properties, Version.VERSION_1); //current default version + } + + public SchemaContainer(String table, String schema, Map<String, String> properties, Integer version) { + this.table = table; + this.schema = schema == null ? null : convert(schema); + if (properties != null) { + this.properties.putAll(properties); + } + this.version = new Version(version); + } + + @JsonProperty("table") + public String getTable() { + return table; + } + + @JsonProperty("schema") + public List<String> getSchemaList() { + return schema == null ? null : schema.toMetadataList().stream() + .map(ColumnMetadata::columnString) + .collect(Collectors.toList()); + } + + @JsonProperty("properties") + public Map<String, String> getProperties() { + return properties; + } + + @JsonProperty("version") + public Integer getVersionValue() { + return version.getValue(); + } + + @JsonIgnore + public TupleMetadata getSchema() { + return schema; + } + + @JsonIgnore + public Version getVersion() { + return version; + } + + private TupleMetadata convert(String schema) { + return SchemaExprParser.parseSchema(schema); + } + + @Override + public String toString() { + return "SchemaContainer{" + "table='" + table + '\'' + ", schema=" + schema + + ", properties=" + properties + ", version=" + version + '}'; + } + + /** + * Schema container version holder contains version in int representation. + * If during initialization null or less then 1 was given, replaces it with + * {@link #UNDEFINED_VERSION} value. + */ + public static class Version { + + public static final int UNDEFINED_VERSION = -1; + public static final int VERSION_1 = 1; + + // is used for testing + public static final int CURRENT_DEFAULT_VERSION = VERSION_1; + + private final int value; + + public Version(Integer value) { + this.value = value == null || value < 1 ? UNDEFINED_VERSION : value; + } + + public int getValue() { + return value; + } + + public boolean isUndefined() { + return UNDEFINED_VERSION == value; + } + + public int compare(Version versionToCompare) { + return Integer.compare(value, versionToCompare.value); + } + + @Override + public String toString() { + return "Version{" + "value=" + value + '}'; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java new file mode 100644 index 000000000..343e0eded --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema; + +import org.apache.drill.exec.store.StorageStrategy; + +import java.io.IOException; +import java.util.Map; + +/** + * Provides mechanisms to manage schema: store / read / delete. + */ +public interface SchemaProvider { + + /** + * Default schema file name where schema is stored on file system. + * File is hidden to avoid including it when reading table data. + */ + String DEFAULT_SCHEMA_NAME = ".drill.schema"; + + /** + * Deletes schema. + */ + void delete() throws IOException; + + /** + * Stores given schema definition and properties. + * If schema is stored in a file, will apply certain permission using {@link StorageStrategy}. + * + * @param schema schema definition + * @param properties map of properties + * @param storageStrategy storage strategy + */ + void store(String schema, Map<String, String> properties, StorageStrategy storageStrategy) throws IOException; + + /** + * Reads schema into {@link SchemaContainer}. Depending on implementation, can read from a file + * or from the given input. + * + * @return table schema instance + */ + SchemaContainer read() throws IOException; + + /** + * Checks if schema exists. + * + * @return true if schema exists, false otherwise + */ + boolean exists() throws IOException; + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java new file mode 100644 index 000000000..3cf376215 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema.parser; + +import org.antlr.v4.runtime.BaseErrorListener; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CodePointCharStream; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.RecognitionException; +import org.antlr.v4.runtime.Recognizer; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +public class SchemaExprParser { + + /** + * Parses string definition of the schema and converts it + * into {@link TupleMetadata} instance. + * + * @param schema schema definition + * @return metadata description of the schema + */ + public static TupleMetadata parseSchema(String schema) { + SchemaVisitor visitor = new SchemaVisitor(); + return visitor.visit(initParser(schema).schema()); + } + + /** + * Parses string definition of the column and converts it + * into {@link ColumnMetadata} instance. + * + * @param column column definition + * @return metadata description of the column + */ + public static ColumnMetadata parseColumn(String column) { + SchemaVisitor.ColumnVisitor visitor = new SchemaVisitor.ColumnVisitor(); + return visitor.visit(initParser(column).column()); + } + + private static SchemaParser initParser(String value) { + CodePointCharStream stream = CharStreams.fromString(value); + UpperCaseCharStream upperCaseStream = new UpperCaseCharStream(stream); + + SchemaLexer lexer = new SchemaLexer(upperCaseStream); + lexer.removeErrorListeners(); + lexer.addErrorListener(ErrorListener.INSTANCE); + + CommonTokenStream tokens = new CommonTokenStream(lexer); + + SchemaParser parser = new SchemaParser(tokens); + parser.removeErrorListeners(); + parser.addErrorListener(ErrorListener.INSTANCE); + + return parser; + } + + /** + * Custom error listener that converts all syntax errors into {@link SchemaParsingException}. + */ + private static class ErrorListener extends BaseErrorListener { + + static final ErrorListener INSTANCE = new ErrorListener(); + + @Override + public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol, int line, + int charPositionInLine, String msg, RecognitionException e) { + StringBuilder builder = new StringBuilder(); + builder.append("Line [").append(line).append("]"); + builder.append(", position [").append(charPositionInLine).append("]"); + if (offendingSymbol != null) { + builder.append(", offending symbol ").append(offendingSymbol); + } + if (msg != null) { + builder.append(": ").append(msg); + } + throw new SchemaParsingException(builder.toString()); + } + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaParsingException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaParsingException.java new file mode 100644 index 000000000..4a7cb6ffc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaParsingException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema.parser; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +/** + * Is thrown when parsing schema using ANTLR4 parser. + */ +public class SchemaParsingException extends DrillRuntimeException { + + public SchemaParsingException(String message) { + super(message); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java new file mode 100644 index 000000000..7c7663a71 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema.parser; + +import org.antlr.v4.runtime.tree.TerminalNode; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MapBuilder; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.RepeatedListBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; + +import java.util.List; + +/** + * Visits schema and stores metadata about its columns into {@link TupleMetadata} class. + */ +public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> { + + @Override + public TupleMetadata visitSchema(SchemaParser.SchemaContext ctx) { + return visitColumns(ctx.columns()); + } + + @Override + public TupleMetadata visitColumns(SchemaParser.ColumnsContext ctx) { + TupleMetadata schema = new TupleSchema(); + ColumnVisitor columnVisitor = new ColumnVisitor(); + ctx.column().forEach( + c -> schema.addColumn(c.accept(columnVisitor)) + ); + return schema; + } + + /** + * Visits various types of columns (primitive, map, array) and stores their metadata + * into {@link ColumnMetadata} class. + */ + public static class ColumnVisitor extends SchemaParserBaseVisitor<ColumnMetadata> { + + @Override + public ColumnMetadata visitPrimitive_column(SchemaParser.Primitive_columnContext ctx) { + String name = ctx.column_id().accept(new IdVisitor()); + TypeProtos.DataMode mode = ctx.nullability() == null ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED; + return ctx.simple_type().accept(new TypeVisitor(name, mode)); + } + + @Override + public ColumnMetadata visitSimple_array_column(SchemaParser.Simple_array_columnContext ctx) { + String name = ctx.column_id().accept(new IdVisitor()); + return ctx.simple_array_type().accept(new ArrayTypeVisitor(name)); + } + + @Override + public ColumnMetadata visitMap_column(SchemaParser.Map_columnContext ctx) { + String name = ctx.column_id().accept(new IdVisitor()); + // Drill does not distinguish between nullable and not null map, by default they are not null + return ctx.map_type().accept(new TypeVisitor(name, TypeProtos.DataMode.REQUIRED)); + } + + @Override + public ColumnMetadata visitComplex_array_column(SchemaParser.Complex_array_columnContext ctx) { + String name = ctx.column_id().accept(new IdVisitor()); + ColumnMetadata child = ctx.complex_array_type().complex_type().accept(new ArrayTypeVisitor(name)); + RepeatedListBuilder builder = new RepeatedListBuilder(null, name); + builder.addColumn(child); + return builder.buildColumn(); + } + + } + + /** + * Visits ID and QUOTED_ID, returning their string representation. + */ + private static class IdVisitor extends SchemaParserBaseVisitor<String> { + + @Override + public String visitId(SchemaParser.IdContext ctx) { + return ctx.ID().getText(); + } + + @Override + public String visitQuoted_id(SchemaParser.Quoted_idContext ctx) { + String text = ctx.QUOTED_ID().getText(); + // first substring first and last symbols (backticks) + // then find all chars that are preceding with the backslash and remove the backslash + return text.substring(1, text.length() -1).replaceAll("\\\\(.)", "$1"); + } + } + + /** + * Visits simple and map types, storing their metadata into {@link ColumnMetadata} holder. + */ + private static class TypeVisitor extends SchemaParserBaseVisitor<ColumnMetadata> { + + private final String name; + private final TypeProtos.DataMode mode; + + TypeVisitor(String name, TypeProtos.DataMode mode) { + this.name = name; + this.mode = mode; + } + + @Override + public ColumnMetadata visitInt(SchemaParser.IntContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.INT, mode)); + } + + @Override + public ColumnMetadata visitBigint(SchemaParser.BigintContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.BIGINT, mode)); + } + + @Override + public ColumnMetadata visitFloat(SchemaParser.FloatContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.FLOAT4, mode)); + } + + @Override + public ColumnMetadata visitDouble(SchemaParser.DoubleContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.FLOAT8, mode)); + } + + @Override + public ColumnMetadata visitDecimal(SchemaParser.DecimalContext ctx) { + TypeProtos.MajorType type = Types.withMode(TypeProtos.MinorType.VARDECIMAL, mode); + + List<TerminalNode> numbers = ctx.NUMBER(); + if (!numbers.isEmpty()) { + int precision = Integer.parseInt(numbers.get(0).getText()); + int scale = numbers.size() == 2 ? Integer.parseInt(numbers.get(1).getText()) : 0; + type = type.toBuilder().setPrecision(precision).setScale(scale).build(); + } + + return constructColumn(type); + } + + @Override + public ColumnMetadata visitBoolean(SchemaParser.BooleanContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.BIT, mode)); + } + + @Override + public ColumnMetadata visitVarchar(SchemaParser.VarcharContext ctx) { + TypeProtos.MajorType type = Types.withMode(TypeProtos.MinorType.VARCHAR, mode); + + if (ctx.NUMBER() != null) { + type = type.toBuilder().setPrecision(Integer.parseInt(ctx.NUMBER().getText())).build(); + } + + return constructColumn(type); + } + + @Override + public ColumnMetadata visitBinary(SchemaParser.BinaryContext ctx) { + TypeProtos.MajorType type = Types.withMode(TypeProtos.MinorType.VARBINARY, mode); + + if (ctx.NUMBER() != null) { + type = type.toBuilder().setPrecision(Integer.parseInt(ctx.NUMBER().getText())).build(); + } + + return constructColumn(type); + } + + @Override + public ColumnMetadata visitTime(SchemaParser.TimeContext ctx) { + TypeProtos.MajorType type = Types.withMode(TypeProtos.MinorType.TIME, mode); + + if (ctx.NUMBER() != null) { + type = type.toBuilder().setPrecision(Integer.parseInt(ctx.NUMBER().getText())).build(); + } + + return constructColumn(type); + } + + @Override + public ColumnMetadata visitDate(SchemaParser.DateContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.DATE, mode)); + } + + @Override + public ColumnMetadata visitTimestamp(SchemaParser.TimestampContext ctx) { + TypeProtos.MajorType type = Types.withMode(TypeProtos.MinorType.TIMESTAMP, mode); + + if (ctx.NUMBER() != null) { + type = type.toBuilder().setPrecision(Integer.parseInt(ctx.NUMBER().getText())).build(); + } + + return constructColumn(type); + } + + @Override + public ColumnMetadata visitInterval_year(SchemaParser.Interval_yearContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.INTERVALYEAR, mode)); + } + + @Override + public ColumnMetadata visitInterval_day(SchemaParser.Interval_dayContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.INTERVALDAY, mode)); + } + + @Override + public ColumnMetadata visitInterval(SchemaParser.IntervalContext ctx) { + return constructColumn(Types.withMode(TypeProtos.MinorType.INTERVAL, mode)); + } + + @Override + public ColumnMetadata visitMap_type(SchemaParser.Map_typeContext ctx) { + MapBuilder builder = new MapBuilder(null, name, mode); + ColumnVisitor visitor = new ColumnVisitor(); + ctx.columns().column().forEach( + c -> builder.addColumn(c.accept(visitor)) + ); + return builder.buildColumn(); + } + + private ColumnMetadata constructColumn(TypeProtos.MajorType type) { + MaterializedField field = MaterializedField.create(name, type); + return MetadataUtils.fromField(field); + } + + } + + /** + * Visits array type: simple (which has only on nested element: array<int>) + * or complex (which has several nested elements: array<int<int>>). + */ + private static class ArrayTypeVisitor extends SchemaParserBaseVisitor<ColumnMetadata> { + + private final String name; + + ArrayTypeVisitor(String name) { + this.name = name; + } + + @Override + public ColumnMetadata visitSimple_array_type(SchemaParser.Simple_array_typeContext ctx) { + TypeVisitor visitor = new TypeVisitor(name, TypeProtos.DataMode.REPEATED); + return ctx.map_type() == null ? ctx.simple_type().accept(visitor) : ctx.map_type().accept(visitor); + } + + @Override + public ColumnMetadata visitComplex_array_type(SchemaParser.Complex_array_typeContext ctx) { + RepeatedListBuilder childBuilder = new RepeatedListBuilder(null, name); + ColumnMetadata child = ctx.complex_type().accept(new ArrayTypeVisitor(name)); + childBuilder.addColumn(child); + return childBuilder.buildColumn(); + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/UpperCaseCharStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/UpperCaseCharStream.java new file mode 100644 index 000000000..cca3685ab --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/UpperCaseCharStream.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record.metadata.schema.parser; + +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.misc.Interval; + +/** + * Is used for case-insensitive lexing. + * Constructs a new stream wrapping forcing all characters to be in upper case. + * Allows building lexical rules match only upper case, making lexer easier to read. + */ +public class UpperCaseCharStream implements CharStream { + + private final CharStream stream; + + public UpperCaseCharStream(CharStream stream) { + this.stream = stream; + } + + @Override + public String getText(Interval interval) { + return stream.getText(interval); + } + + @Override + public void consume() { + stream.consume(); + } + + @Override + public int LA(int i) { + int c = stream.LA(i); + if (c <= 0) { + return c; + } + return Character.toUpperCase(c); + } + + @Override + public int mark() { + return stream.mark(); + } + + @Override + public void release(int marker) { + stream.release(marker); + } + + @Override + public int index() { + return stream.index(); + } + + @Override + public void seek(int index) { + stream.seek(index); + } + + @Override + public int size() { + return stream.size(); + } + + @Override + public String getSourceName() { + return stream.getSourceName(); + } +} |