aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
blob: ccd7e36361b1c439936f44d61a3320c5856fb309 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.Version;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.GetSchemasReq;
import org.apache.drill.exec.proto.UserProtos.GetSchemasResp;
import org.apache.drill.exec.proto.UserProtos.GetServerMetaReq;
import org.apache.drill.exec.proto.UserProtos.GetServerMetaResp;
import org.apache.drill.exec.proto.UserProtos.GetTablesReq;
import org.apache.drill.exec.proto.UserProtos.GetTablesResp;
import org.apache.drill.exec.proto.UserProtos.LikeFilter;
import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.RpcEndpointInfos;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.ChannelClosedException;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.NonTransientRpcException;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.rpc.user.UserRpcUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.SettableFuture;

import io.netty.channel.EventLoopGroup;

/**
 * Thin wrapper around a UserClient that handles connect/close and transforms
 * String into ByteBuf.
 */
public class DrillClient implements Closeable, ConnectionThrottle {
  public static final String DEFAULT_CLIENT_NAME = "Apache Drill Java client";

  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);

  private static final ObjectMapper objectMapper = new ObjectMapper();
  private final DrillConfig config;
  private UserClient client;
  private DrillProperties properties;
  private volatile ClusterCoordinator clusterCoordinator;
  private volatile boolean connected = false;
  private final BufferAllocator allocator;
  private int reconnectTimes;
  private int reconnectDelay;
  private boolean supportComplexTypes;
  private final boolean ownsZkConnection;
  private final boolean ownsAllocator;
  private final boolean isDirectConnection; // true if the connection bypasses zookeeper and connects directly to a drillbit
  private EventLoopGroup eventLoopGroup;
  private ExecutorService executor;
  private String clientName = DEFAULT_CLIENT_NAME;

  public DrillClient() throws OutOfMemoryException {
    this(DrillConfig.create(), false);
  }

  public DrillClient(boolean isDirect) throws OutOfMemoryException {
    this(DrillConfig.create(), isDirect);
  }

  public DrillClient(String fileName) throws OutOfMemoryException {
    this(DrillConfig.create(fileName), false);
  }

  public DrillClient(DrillConfig config) throws OutOfMemoryException {
    this(config, null, false);
  }

  public DrillClient(DrillConfig config, boolean isDirect)
      throws OutOfMemoryException {
    this(config, null, isDirect);
  }

  public DrillClient(DrillConfig config, ClusterCoordinator coordinator)
    throws OutOfMemoryException {
    this(config, coordinator, null, false);
  }

  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, boolean isDirect)
    throws OutOfMemoryException {
    this(config, coordinator, null, isDirect);
  }

  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator)
      throws OutOfMemoryException {
    this(config, coordinator, allocator, false);
  }

  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator, boolean isDirect) {
    // if isDirect is true, the client will connect directly to the drillbit instead of
    // going thru the zookeeper
    this.isDirectConnection = isDirect;
    this.ownsZkConnection = coordinator == null && !isDirect;
    this.ownsAllocator = allocator == null;
    this.allocator = ownsAllocator ? RootAllocatorFactory.newRoot(config) : allocator;
    this.config = config;
    this.clusterCoordinator = coordinator;
    this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
    this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY);
    this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES);
  }

  public DrillConfig getConfig() {
    return config;
  }

  @Override
  public void setAutoRead(boolean enableAutoRead) {
    client.setAutoRead(enableAutoRead);
  }

  /**
   * Sets the client name.
   *
   * If not set, default is {@code DrillClient#DEFAULT_CLIENT_NAME}.
   *
   * @param name the client name
   *
   * @throws IllegalStateException if called after a connection has been established.
   * @throws NullPointerException if client name is null
   */
  public void setClientName(String name) {
    if (connected) {
      throw new IllegalStateException("Attempted to modify client connection property after connection has been established.");
    }
    this.clientName = checkNotNull(name, "client name should not be null");
  }

  /**
   * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
   * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
   *
   * @throws IllegalStateException if called after a connection has been established.
   */
  public void setSupportComplexTypes(boolean supportComplexTypes) {
    if (connected) {
      throw new IllegalStateException("Attempted to modify client connection property after connection has been established.");
    }
    this.supportComplexTypes = supportComplexTypes;
  }

  /**
   * Connects the client to a Drillbit server
   *
   * @throws RpcException
   */
  public void connect() throws RpcException {
    connect(null, new Properties());
  }

  /**
   * Start's a connection from client to server
   * @param props - not null {@link Properties} filled with connection url parameters
   * @throws RpcException
   */
  public void connect(Properties props) throws RpcException {
    connect(null, props);
  }

  /**
   * Populates the endpointlist with drillbits information provided in the connection string by client.
   * For direct connection we can have connection string with drillbit property as below:
   * <dl>
   *   <dt>drillbit=ip</dt>
   *   <dd>use the ip specified as the Foreman ip with default port in config file</dd>
   *   <dt>drillbit=ip:port</dt>
   *   <dd>use the ip and port specified as the Foreman ip and port</dd>
   *   <dt>drillbit=ip1:port1,ip2:port2,...</dt>
   *   <dd>randomly select the ip and port pair from the specified list as the Foreman ip and port.</dd>
   * </dl>
   *
   * @param drillbits string with drillbit value provided in connection string
   * @param defaultUserPort string with default userport of drillbit specified in config file
   * @return list of drillbit endpoints parsed from connection string
   * @throws InvalidConnectionInfoException if the connection string has invalid or no drillbit information
   */
  static List<DrillbitEndpoint> parseAndVerifyEndpoints(String drillbits, String defaultUserPort)
                                throws InvalidConnectionInfoException {
    // If no drillbits is provided then throw exception
    drillbits = drillbits.trim();
    if (drillbits.isEmpty()) {
      throw new InvalidConnectionInfoException("No drillbit information specified in the connection string");
    }

    final List<DrillbitEndpoint> endpointList = new ArrayList<>();
    final String[] connectInfo = drillbits.split(",");

    // Fetch ip address and port information for each drillbit and populate the list
    for (String drillbit : connectInfo) {

      // Trim all the empty spaces and check if the entry is empty string.
      // Ignore the empty ones.
      drillbit = drillbit.trim();

      if (!drillbit.isEmpty()) {
        // Verify if we have only ":" or only ":port" pattern
        if (drillbit.charAt(0) == ':') {
          // Invalid drillbit information
          throw new InvalidConnectionInfoException("Malformed connection string with drillbit hostname or " +
                                                     "hostaddress missing for an entry: " + drillbit);
        }

        // We are now sure that each ip:port entry will have both the values atleast once.
        // Split each drillbit connection string to get ip address and port value
        final String[] drillbitInfo = drillbit.split(":");

        // Check if we have more than one port
        if (drillbitInfo.length > 2) {
          throw new InvalidConnectionInfoException("Malformed connection string with more than one port in a " +
                                                     "drillbit entry: " + drillbit);
        }

        // At this point we are sure that drillbitInfo has atleast hostname or host address
        // trim all the empty spaces which might be present in front of hostname or
        // host address information
        final String ipAddress = drillbitInfo[0].trim();
        String port = defaultUserPort;

        if (drillbitInfo.length == 2) {
          // We have a port value also given by user. trim all the empty spaces between : and port value before
          // validating the correctness of value.
          port = drillbitInfo[1].trim();
        }

        try {
          final DrillbitEndpoint endpoint = DrillbitEndpoint.newBuilder()
                                            .setAddress(ipAddress)
                                            .setUserPort(Integer.parseInt(port))
                                            .build();

          endpointList.add(endpoint);
        } catch (NumberFormatException e) {
          throw new InvalidConnectionInfoException("Malformed port value in entry: " + ipAddress + ":" + port + " " +
                                                     "passed in connection string");
        }
      }
    }
    if (endpointList.size() == 0) {
      throw new InvalidConnectionInfoException("No valid drillbit information specified in the connection string");
    }
    return endpointList;
  }

  /**
   * Start's a connection from client to server
   * @param connect - Zookeeper connection string provided at connection URL
   * @param props - not null {@link Properties} filled with connection url parameters
   * @throws RpcException
   */
  public synchronized void connect(String connect, Properties props) throws RpcException {
    if (connected) {
      return;
    }

    properties = DrillProperties.createFromProperties(props);
    final List<DrillbitEndpoint> endpoints = new ArrayList<>();

    if (isDirectConnection) {
      // Populate the endpoints list with all the drillbit information provided in the connection string
      endpoints.addAll(parseAndVerifyEndpoints(properties.getProperty(DrillProperties.DRILLBIT_CONNECTION),
                                               config.getString(ExecConstants.INITIAL_USER_PORT)));
    } else {
      if (ownsZkConnection) {
        try {
          this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
          this.clusterCoordinator.start(10000);
        } catch (Exception e) {
          throw new RpcException("Failure setting up ZK for client.", e);
        }
      }
      // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
      // in QUIESCENT state. This avoids the clients connecting to drillbits that are
      // shutting down thereby avoiding reducing the chances of query failures.
      endpoints.addAll(clusterCoordinator.getOnlineEndPoints());
      // Make sure we have at least one endpoint in the list
      checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper. Check connection parameters?");
    }

    // shuffle the collection then get the first endpoint
    Collections.shuffle(endpoints);

    eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
    executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new NamedThreadFactory("drill-client-executor-")) {
      @Override
      protected void afterExecute(final Runnable r, final Throwable t) {
        if (t != null) {
          logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
        }
        super.afterExecute(r, t);
      }
    };

    final String connectTriesConf = properties.getProperty(DrillProperties.TRIES, "5");
    int connectTriesVal;
    try {
      connectTriesVal = Math.min(endpoints.size(), Integer.parseInt(connectTriesConf));
    } catch (NumberFormatException e) {
      throw new InvalidConnectionInfoException("Invalid tries value: " + connectTriesConf + " specified in " +
                                               "connection string");
    }

    // If the value provided in the connection string is <=0 then override with 1 since we want to try connecting
    // at least once
    connectTriesVal = Math.max(1, connectTriesVal);

    int triedEndpointIndex = 0;
    DrillbitEndpoint endpoint;

    while (triedEndpointIndex < connectTriesVal) {
      endpoint = endpoints.get(triedEndpointIndex);

      // Set in both props and properties since props is passed to UserClient
      // TODO: Logically here it's doing putIfAbsent, please change to use that api once JDK 8 is minimum required
      // version
      if (!properties.containsKey(DrillProperties.SERVICE_HOST)) {
        properties.setProperty(DrillProperties.SERVICE_HOST, endpoint.getAddress());
        props.setProperty(DrillProperties.SERVICE_HOST, endpoint.getAddress());
      }

      // Note: the properties member is a DrillProperties instance which lower cases names of
      // properties. That does not work too well with properties that are mixed case.
      // For user client severla properties are mixed case so we do not use the properties member
      // but instead pass the props parameter.
      client = new UserClient(clientName, config, props, supportComplexTypes, allocator, eventLoopGroup, executor, endpoint);
      logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());

      try {
        connect(endpoint);
        connected = true;
        logger.info("Successfully connected to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
        break;
      } catch (NonTransientRpcException ex) {
        logger.error("Connection to {}:{} failed with error {}. Not retrying anymore", endpoint.getAddress(),
                     endpoint.getUserPort(), ex.getMessage());
        throw ex;
      } catch (RpcException ex) {
        ++triedEndpointIndex;
        logger.error("Attempt {}: Failed to connect to server {}:{}", triedEndpointIndex, endpoint.getAddress(),
                     endpoint.getUserPort());

        // Throw exception when we have exhausted all the tries without having a successful connection
        if (triedEndpointIndex == connectTriesVal) {
          throw ex;
        }

        // Close the connection here to avoid calling close twice in case when all tries are exhausted.
        // Since DrillClient.close is also calling client.close
        client.close();
      }
    }
  }

  protected static EventLoopGroup createEventLoop(int size, String prefix) {
    return TransportCheck.createEventLoopGroup(size, prefix);
  }

  public synchronized boolean reconnect() {
    if (client.isActive()) {
      return true;
    }
    int retry = reconnectTimes;
    while (retry > 0) {
      retry--;
      try {
        Thread.sleep(this.reconnectDelay);
        // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
        // in QUIESCENT state. This avoids the clients connecting to drillbits that are
        // shutting down thereby reducing the chances of query failures.
        final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getOnlineEndPoints());
        if (endpoints.isEmpty()) {
          continue;
        }
        client.close();
        Collections.shuffle(endpoints);
        connect(endpoints.iterator().next());
        return true;
      } catch (Exception e) {
      }
    }
    return false;
  }

  private void connect(DrillbitEndpoint endpoint) throws RpcException {
    client.connect(endpoint, properties, getUserCredentials());
    logger.info("Foreman drillbit is {}", endpoint.getAddress());
  }

  public BufferAllocator getAllocator() {
    return allocator;
  }

  /**
   * Closes this client's connection to the server
   */
  @Override
  public void close() {
    if (this.client != null) {
      this.client.close();
    }
    if (this.ownsAllocator && allocator != null) {
      DrillAutoCloseables.closeNoChecked(allocator);
    }
    if (ownsZkConnection) {
      if (clusterCoordinator != null) {
        try {
          clusterCoordinator.close();
          clusterCoordinator = null;
        } catch (Exception e) {
          logger.warn("Error while closing Cluster Coordinator.", e);
        }
      }
    }
    if (eventLoopGroup != null) {
      eventLoopGroup.shutdownGracefully();
    }

    if (executor != null) {
      executor.shutdownNow();
    }

    // TODO:  Did DRILL-1735 changes cover this TODO?:
    // TODO: fix tests that fail when this is called.
    //allocator.close();
    connected = false;
  }


  /**
   * Return the server infos. Only available after connecting
   *
   * The result might be null if the server doesn't provide the informations.
   *
   * @return the server informations, or null if not connected or if the server
   *         doesn't provide the information
   * @deprecated use {@code DrillClient#getServerVersion()}
   */
  @Deprecated
  public RpcEndpointInfos getServerInfos() {
    return client != null ? client.getServerInfos() : null;
  }

  /**
   * Return the server name. Only available after connecting
   *
   * The result might be null if the server doesn't provide the name information.
   *
   * @return the server name, or null if not connected or if the server
   *         doesn't provide the name
   * @return The server name.
   */
  public String getServerName() {
    return (client != null && client.getServerInfos() != null) ? client.getServerInfos().getName() : null;
  }

  /**
   * Return the server version. Only available after connecting
   *
   * The result might be null if the server doesn't provide the version information.
   *
   * @return the server version, or null if not connected or if the server
   *         doesn't provide the version
   * @return The server version.
   */
  public Version getServerVersion() {
    return (client != null && client.getServerInfos() != null) ? UserRpcUtils.getVersion(client.getServerInfos()) : null;
  }

  /**
   * Get server meta information
   *
   * Get meta information about the server like the the available functions
   * or the identifier quoting string used by the current session
   *
   * @return a future to the server meta response
   */
  public DrillRpcFuture<GetServerMetaResp> getServerMeta() {
    return client.send(RpcType.GET_SERVER_META, GetServerMetaReq.getDefaultInstance(), GetServerMetaResp.class);
  }

  /**
   * Returns the list of methods supported by the server based on its advertised information.
   *
   * @return an immutable set of capabilities
   */
  public Set<ServerMethod> getSupportedMethods() {
    return client != null ? ServerMethod.getSupportedMethods(client.getSupportedMethods(), client.getServerInfos()) : null;
  }

  /**
   * Submits a string based query plan for execution and returns the result batches. Supported query types are:
   * <p><ul>
   *  <li>{@link QueryType#LOGICAL}
   *  <li>{@link QueryType#PHYSICAL}
   *  <li>{@link QueryType#SQL}
   * </ul>
   *
   * @param type Query type
   * @param plan Query to execute
   * @return a handle for the query result
   * @throws RpcException
   */
  public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
    checkArgument(type == QueryType.LOGICAL || type == QueryType.PHYSICAL || type == QueryType.SQL,
        String.format("Only query types %s, %s and %s are supported in this API",
            QueryType.LOGICAL, QueryType.PHYSICAL, QueryType.SQL));
    final UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
    final ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
    client.submitQuery(listener, query);
    return listener.getResults();
  }

  /**
   * API to just plan a query without execution
   * @param type
   * @param query
   * @param isSplitPlan - option to tell whether to return single or split plans for a query
   * @return list of PlanFragments that can be used later on in {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)}
   * to run a query without additional planning
   */
  public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) {
    GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build();
    return client.planQuery(runQuery);
  }

  /**
   * Run query based on list of fragments that were supposedly produced during query planning phase. Supported
   * query type is {@link QueryType#EXECUTION}
   * @param type
   * @param planFragments
   * @param resultsListener
   * @throws RpcException
   */
  public void runQuery(QueryType type, List<PlanFragment> planFragments, UserResultsListener resultsListener)
      throws RpcException {
    // QueryType can be only executional
    checkArgument((QueryType.EXECUTION == type), "Only EXECUTION type query is supported with PlanFragments");
    // setting Plan on RunQuery will be used for logging purposes and therefore can not be null
    // since there is no Plan string provided we will create a JsonArray out of individual fragment Plans
    ArrayNode jsonArray = objectMapper.createArrayNode();
    for (PlanFragment fragment : planFragments) {
      try {
        jsonArray.add(objectMapper.readTree(fragment.getFragmentJson()));
      } catch (IOException e) {
        logger.error("Exception while trying to read PlanFragment JSON for %s", fragment.getHandle().getQueryId(), e);
        throw new RpcException(e);
      }
    }
    final String fragmentsToJsonString;
    try {
      fragmentsToJsonString = objectMapper.writeValueAsString(jsonArray);
    } catch (JsonProcessingException e) {
      logger.error("Exception while trying to get JSONString from Array of individual Fragments Json for %s", e);
      throw new RpcException(e);
    }
    final UserProtos.RunQuery query = newBuilder().setType(type).addAllFragments(planFragments)
        .setPlan(fragmentsToJsonString)
        .setResultsMode(STREAM_FULL).build();
    client.submitQuery(resultsListener, query);
  }

  /*
   * Helper method to generate the UserCredentials message from the properties.
   */
  private UserBitShared.UserCredentials getUserCredentials() {
    String userName = properties.getProperty(DrillProperties.USER);
    if (Strings.isNullOrEmpty(userName)) {
      userName = "anonymous"; // if username is not propagated as one of the properties
    }
    return UserBitShared.UserCredentials.newBuilder()
      .setUserName(userName)
      .build();
  }

  public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
    if(logger.isDebugEnabled()) {
      logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
    }
    return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
  }

  public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
    if(logger.isDebugEnabled()) {
      logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
    }
    return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
  }

  /**
   * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
   *
   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
   * @return The list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
   */
  public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) {
    final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder();
    if (catalogNameFilter != null) {
      reqBuilder.setCatalogNameFilter(catalogNameFilter);
    }

    return client.send(RpcType.GET_CATALOGS, reqBuilder.build(), GetCatalogsResp.class);
  }

  /**
   * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
   *
   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
   * @return The list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
   */
  public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) {
    final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder();
    if (catalogNameFilter != null) {
      reqBuilder.setCatalogNameFilter(catalogNameFilter);
    }

    if (schemaNameFilter != null) {
      reqBuilder.setSchemaNameFilter(schemaNameFilter);
    }

    return client.send(RpcType.GET_SCHEMAS, reqBuilder.build(), GetSchemasResp.class);
  }

  /**
   * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
   *
   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
   * @param tableTypeFilter Filter in <code>table type</code>. Pass null to apply no filter
   * @return The list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
   */
  public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
      LikeFilter tableNameFilter, List<String> tableTypeFilter) {
    final GetTablesReq.Builder reqBuilder = GetTablesReq.newBuilder();
    if (catalogNameFilter != null) {
      reqBuilder.setCatalogNameFilter(catalogNameFilter);
    }

    if (schemaNameFilter != null) {
      reqBuilder.setSchemaNameFilter(schemaNameFilter);
    }

    if (tableNameFilter != null) {
      reqBuilder.setTableNameFilter(tableNameFilter);
    }

    if (tableTypeFilter != null) {
      reqBuilder.addAllTableTypeFilter(tableTypeFilter);
    }

    return client.send(RpcType.GET_TABLES, reqBuilder.build(), GetTablesResp.class);
  }

  /**
   * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
   *
   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
   * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter.
   * @return The list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
   */
  public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
      LikeFilter tableNameFilter, LikeFilter columnNameFilter) {
    final GetColumnsReq.Builder reqBuilder = GetColumnsReq.newBuilder();
    if (catalogNameFilter != null) {
      reqBuilder.setCatalogNameFilter(catalogNameFilter);
    }

    if (schemaNameFilter != null) {
      reqBuilder.setSchemaNameFilter(schemaNameFilter);
    }

    if (tableNameFilter != null) {
      reqBuilder.setTableNameFilter(tableNameFilter);
    }

    if (columnNameFilter != null) {
      reqBuilder.setColumnNameFilter(columnNameFilter);
    }

    return client.send(RpcType.GET_COLUMNS, reqBuilder.build(), GetColumnsResp.class);
  }

  /**
   * Create a prepared statement for given the <code>query</code>.
   *
   * @param query
   * @return The prepared statement for given the <code>query</code>.
   */
  public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) {
    final CreatePreparedStatementReq req =
        CreatePreparedStatementReq.newBuilder()
            .setSqlQuery(query)
            .build();

    return client.send(RpcType.CREATE_PREPARED_STATEMENT, req, CreatePreparedStatementResp.class);
  }

  /**
   * Execute the given prepared statement.
   *
   * @param preparedStatementHandle Prepared statement handle returned in response to
   *                                {@link #createPreparedStatement(String)}.
   * @param resultsListener {@link UserResultsListener} instance for listening for query results.
   */
  public void executePreparedStatement(final PreparedStatementHandle preparedStatementHandle,
      final UserResultsListener resultsListener) {
    final RunQuery runQuery = newBuilder()
        .setResultsMode(STREAM_FULL)
        .setType(QueryType.PREPARED_STATEMENT)
        .setPreparedStatementHandle(preparedStatementHandle)
        .build();
    client.submitQuery(resultsListener, runQuery);
  }

  /**
   * Execute the given prepared statement and return the results.
   *
   * @param preparedStatementHandle Prepared statement handle returned in response to
   *                                {@link #createPreparedStatement(String)}.
   * @return List of {@link QueryDataBatch}s. It is responsibility of the caller to release query data batches.
   * @throws RpcException
   */
  @VisibleForTesting
  public List<QueryDataBatch> executePreparedStatement(final PreparedStatementHandle preparedStatementHandle)
      throws RpcException {
    final RunQuery runQuery = newBuilder()
        .setResultsMode(STREAM_FULL)
        .setType(QueryType.PREPARED_STATEMENT)
        .setPreparedStatementHandle(preparedStatementHandle)
        .build();

    final ListHoldingResultsListener resultsListener = new ListHoldingResultsListener(runQuery);

    client.submitQuery(resultsListener, runQuery);

    return resultsListener.getResults();
  }

  /**
   * Submits a Logical plan for direct execution (bypasses parsing)
   *
   * @param  plan  the plan to execute
   */
  public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) {
    client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
  }

  private class ListHoldingResultsListener implements UserResultsListener {
    private final Vector<QueryDataBatch> results = new Vector<>();
    private final SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
    private final UserProtos.RunQuery query ;

    public ListHoldingResultsListener(UserProtos.RunQuery query) {
      logger.debug( "Listener created for query \"\"\"{}\"\"\"", query );
      this.query = query;
    }

    @Override
    public void submissionFailed(UserException ex) {
      // or  !client.isActive()
      if (ex.getCause() instanceof ChannelClosedException) {
        if (reconnect()) {
          try {
            client.submitQuery(this, query);
          } catch (Exception e) {
            fail(e);
          }
        } else {
          fail(ex);
        }
      } else {
        fail(ex);
      }
    }

    @Override
    public void queryCompleted(QueryState state) {
      future.set(results);
    }

    private void fail(Exception ex) {
      logger.debug("Submission failed.", ex);
      future.setException(ex);
      future.set(results);
    }

    @Override
    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
      logger.debug("Result arrived:  Result: {}", result );
      results.add(result);
    }

    public List<QueryDataBatch> getResults() throws RpcException{
      try {
        return future.get();
      } catch (Throwable t) {
        /*
         * Since we're not going to return the result to the caller
         * to clean up, we have to do it.
         */
        for(final QueryDataBatch queryDataBatch : results) {
          queryDataBatch.release();
        }

        throw RpcException.mapException(t);
      }
    }

    @Override
    public void queryIdArrived(QueryId queryId) {
      if (logger.isDebugEnabled()) {
        logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId));
      }
    }
  }
}