aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
blob: e07fb15896b2e75badb3268f60d2c3e81075e438 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.planner.sql;

import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class SchemaUtilites {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaUtilites.class);
  public static final Joiner SCHEMA_PATH_JOINER = Joiner.on(".").skipNulls();

  /**
   * Search and return schema with given schemaPath. First search in schema tree starting from defaultSchema,
   * if not found search starting from rootSchema. Root schema tree is derived from the defaultSchema reference.
   *
   * @param defaultSchema Reference to the default schema in complete schema tree.
   * @param schemaPath Schema path to search.
   * @return SchemaPlus object.
   */
  public static SchemaPlus findSchema(final SchemaPlus defaultSchema, final List<String> schemaPath) {
    if (schemaPath.size() == 0) {
      return defaultSchema;
    }

    SchemaPlus schema;
    if ((schema = searchSchemaTree(defaultSchema, schemaPath)) != null) {
      return schema;
    }

    SchemaPlus rootSchema = defaultSchema;
    while(rootSchema.getParentSchema() != null) {
      rootSchema = rootSchema.getParentSchema();
    }

    if (rootSchema != defaultSchema &&
        (schema = searchSchemaTree(rootSchema, schemaPath)) != null) {
      return schema;
    }

    return null;
  }

  /**
   * Same utility as {@link #findSchema(SchemaPlus, List)} except the search schema path given here is complete path
   * instead of list. Use "." separator to divided the schema into nested schema names.
   * @param defaultSchema default schema
   * @param schemaPath current schema path
   * @return found schema path
   */
  public static SchemaPlus findSchema(final SchemaPlus defaultSchema, final String schemaPath) {
    final List<String> schemaPathAsList = getSchemaPathAsList(schemaPath);
    return findSchema(defaultSchema, schemaPathAsList);
  }

  /**
   * Utility function to get the commonPrefix schema between two supplied schemas.
   *
   * Eg: if the defaultSchema: dfs and the schemaPath is dfs.tmp.`cicks.json`
   *     then this function returns dfs if (caseSensitive is not true
   *     otherwise it returns empty string.
   *
   * @param defaultSchema default schema
   * @param schemaPath current schema path
   * @param isCaseSensitive true if caseSensitive comparision is required.
   * @return common prefix schemaPath
   */
  public static String getPrefixSchemaPath(final String defaultSchema,
                                           final String schemaPath,
                                           final boolean isCaseSensitive) {
    if (!isCaseSensitive) {
      return Strings.commonPrefix(defaultSchema.toLowerCase(), schemaPath.toLowerCase());
    }
    else {
      return Strings.commonPrefix(defaultSchema, schemaPath);
    }
  }

  /** Utility method to search for schema path starting from the given <i>schema</i> reference */
  private static SchemaPlus searchSchemaTree(SchemaPlus schema, final List<String> schemaPath) {
    for (String schemaName : schemaPath) {
      // schemas in Drill are case insensitive and stored in lower case
      schema = schema.getSubSchema(schemaName.toLowerCase());
      if (schema == null) {
        return null;
      }
    }
    return schema;
  }

  /**
   * @param schema current schema
   * @return true if the given <i>schema</i> is root schema. False otherwise.
   */
  public static boolean isRootSchema(SchemaPlus schema) {
    return schema == null || schema.getParentSchema() == null;
  }

  /**
   * Unwrap given <i>SchemaPlus</i> instance as Drill schema instance (<i>AbstractSchema</i>). Once unwrapped, return
   * default schema from <i>AbstractSchema</i>. If the given schema is not an instance of <i>AbstractSchema</i> a
   * {@link UserException} is thrown.
   */
  public static AbstractSchema unwrapAsDrillSchemaInstance(SchemaPlus schemaPlus)  {
    try {
      return (AbstractSchema) schemaPlus.unwrap(AbstractSchema.class).getDefaultSchema();
    } catch (ClassCastException e) {
      throw UserException.validationError(e)
          .message("Schema [%s] is not a Drill schema.", getSchemaPath(schemaPlus))
          .build(logger);
    }
  }

  /** Utility method to get the schema path for given schema instance. */
  public static String getSchemaPath(SchemaPlus schema) {
    return SCHEMA_PATH_JOINER.join(getSchemaPathAsList(schema));
  }

  /** Utility method to get the schema path for given list of schema path. */
  public static String getSchemaPath(List<String> schemaPath) {
    return SCHEMA_PATH_JOINER.join(schemaPath);
  }

  /** Utility method to get the list with schema path components for given schema path string. */
  public static List<String> getSchemaPathAsList(String schemaPath) {
    return Arrays.asList(schemaPath.split("\\."));
  }

  /** Utility method to get the schema path as list for given schema instance. */
  public static List<String> getSchemaPathAsList(SchemaPlus schema) {
    if (isRootSchema(schema)) {
      return Collections.emptyList();
    }

    List<String> path = Lists.newArrayListWithCapacity(5);
    while(schema != null) {
      final String name = schema.getName();
      if (!Strings.isNullOrEmpty(name)) {
        path.add(schema.getName());
      }
      schema = schema.getParentSchema();
    }

    return Lists.reverse(path);
  }

  /** Utility method to throw {@link UserException} with context information */
  public static void throwSchemaNotFoundException(final SchemaPlus defaultSchema, final String givenSchemaPath) {
    throw UserException.validationError()
        .message("Schema [%s] is not valid with respect to either root schema or current default schema.",
            givenSchemaPath)
        .addContext("Current default schema: ",
            isRootSchema(defaultSchema) ? "No default schema selected" : getSchemaPath(defaultSchema))
        .build(logger);
  }

  /** Utility method to throw {@link UserException} with context information */
  public static void throwSchemaNotFoundException(final SchemaPlus defaultSchema, final List<String> givenSchemaPath) {
    throw UserException.validationError()
            .message("Schema [%s] is not valid with respect to either root schema or current default schema.",
                    givenSchemaPath)
            .addContext("Current default schema: ",
                    isRootSchema(defaultSchema) ? "No default schema selected" : getSchemaPath(defaultSchema))
            .build(logger);
  }

  /**
   * Given reference to default schema in schema tree, search for schema with given <i>schemaPath</i>. Once a schema is
   * found resolve it into a mutable <i>AbstractDrillSchema</i> instance. A {@link UserException} is throws when:
   *   <li>No schema for given <i>schemaPath</i> is found.</li>
   *   <li>Schema found for given <i>schemaPath</i> is a root schema.</li>
   *   <li>Resolved schema is not a mutable schema.</li>
   *
   * @param defaultSchema default schema
   * @param schemaPath current schema path
   * @return mutable schema, exception otherwise
   */
  public static AbstractSchema resolveToMutableDrillSchema(final SchemaPlus defaultSchema, List<String> schemaPath) {
    final SchemaPlus schema = findSchema(defaultSchema, schemaPath);

    if (schema == null) {
      throwSchemaNotFoundException(defaultSchema, SCHEMA_PATH_JOINER.join(schemaPath));
    }

    if (isRootSchema(schema)) {
      throw UserException.validationError()
          .message("Root schema is immutable. Creating or dropping tables/views is not allowed in root schema." +
              "Select a schema using 'USE schema' command.")
          .build(logger);
    }

    final AbstractSchema drillSchema = unwrapAsDrillSchemaInstance(schema);
    if (!drillSchema.isMutable()) {
      throw UserException.validationError()
          .message("Unable to create or drop objects. Schema [%s] is immutable.", getSchemaPath(schema))
          .build(logger);
    }

    return drillSchema;
  }

  /**
   * Looks in schema tree for default temporary workspace instance.
   *
   * @param defaultSchema default schema
   * @param config drill config
   * @return default temporary workspace, null if workspace was not found
   */
  public static AbstractSchema getTemporaryWorkspace(SchemaPlus defaultSchema, DrillConfig config) {
    String temporarySchema = config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE);
    List<String> temporarySchemaPath = Lists.newArrayList(temporarySchema);
    SchemaPlus schema = findSchema(defaultSchema, temporarySchemaPath);
    return schema == null ? null : unwrapAsDrillSchemaInstance(schema);
  }

  /**
   * Checks that passed schema path is the same as temporary workspace path.
   * Check is case-sensitive.
   *
   * @param schemaPath schema path
   * @param config drill config
   * @return true is schema path corresponds to temporary workspace, false otherwise
   */
  public static boolean isTemporaryWorkspace(String schemaPath, DrillConfig config) {
    return schemaPath.equals(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE));
  }

  /**
   * Makes sure that passed workspace exists, is default temporary workspace, mutable and file-based
   * (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}).
   *
   * @param schema drill schema
   * @param config drill config
   * @return mutable & file-based workspace instance, otherwise throws validation error
   */
  public static WorkspaceSchemaFactory.WorkspaceSchema resolveToValidTemporaryWorkspace(AbstractSchema schema,
                                                                                        DrillConfig config) {
    if (schema == null) {
      throw UserException.validationError()
          .message("Default temporary workspace is not found")
          .build(logger);
    }

    if (!isTemporaryWorkspace(schema.getFullSchemaName(), config)) {
      throw UserException
          .validationError()
          .message(String.format("Temporary tables are not allowed to be created / dropped " +
                  "outside of default temporary workspace [%s].",
              config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE)))
          .build(logger);
    }

    if (!schema.isMutable()) {
      throw UserException.validationError()
          .message("Unable to create or drop temporary table. Schema [%s] is immutable.", schema.getFullSchemaName())
          .build(logger);
    }

    if (schema instanceof WorkspaceSchemaFactory.WorkspaceSchema) {
      return (WorkspaceSchemaFactory.WorkspaceSchema) schema;
    } else {
      throw UserException.validationError()
          .message("Temporary workspace [%s] must be file-based, instance of " +
              "WorkspaceSchemaFactory.WorkspaceSchema", schema)
          .build(logger);
    }
  }

  /**
   * If table schema is not indicated in sql call, returns temporary workspace.
   * If schema is indicated, resolves to mutable table schema.
   *
   * @param tableSchema table schema
   * @param defaultSchema default schema
   * @param config drill config
   * @return resolved schema
   */
  public static AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) {
    if (tableSchema.size() == 0) {
      return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config);
    } else {
      return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema);
    }
  }

}