diff options
author | Oliver Szabo <oleewere@gmail.com> | 2018-07-09 22:44:06 +0200 |
---|---|---|
committer | Oliver Szabo <oleewere@gmail.com> | 2018-07-09 22:47:51 +0200 |
commit | b8c2108cad777c4daad716ae11868b9c006592b9 (patch) | |
tree | 7bcbd38c1b7e7d8cc1e984badc993e3ccd7b772f | |
parent | 219a23623349bc3715f6c667369477bd32ef1b0b (diff) |
AMBARI-23945. Infra Solr migration: use async request for collection deletion
-rw-r--r-- | ambari-infra/ambari-infra-solr-client/README.md | 3 | ||||
-rwxr-xr-x | ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py | 112 |
2 files changed, 102 insertions, 13 deletions
diff --git a/ambari-infra/ambari-infra-solr-client/README.md b/ambari-infra/ambari-infra-solr-client/README.md index b4c4614ac5..a14f92a881 100644 --- a/ambari-infra/ambari-infra-solr-client/README.md +++ b/ambari-infra/ambari-infra-solr-client/README.md @@ -853,6 +853,9 @@ Options: only if no backup path in the ini file --version=INDEX_VERSION lucene index version for migration (6.6.2 or 7.3.1) + --solr-async-request-tries=SOLR_ASYNC_REQUEST_TRIES + number of max tries for async Solr requests (e.g.: + delete operation) --request-tries=REQUEST_TRIES number of tries for BACKUP/RESTORE status api calls in the request diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py index 56ab9ad97e..1842333df0 100755 --- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py +++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py @@ -34,7 +34,7 @@ import ConfigParser import solrDataManager as solr_data_manager from datetime import datetime, timedelta -from random import randrange +from random import randrange, randint from subprocess import Popen, PIPE HTTP_PROTOCOL = 'http' @@ -68,8 +68,9 @@ CREATE_CONFIGURATIONS_URL = '/configurations' LIST_SOLR_COLLECTION_URL = '{0}/admin/collections?action=LIST&wt=json' CREATE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=CREATE&name={1}&collection.configName={2}&numShards={3}&replicationFactor={4}&maxShardsPerNode={5}&wt=json' -DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json' +DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json&async={2}' RELOAD_SOLR_COLLECTION_URL = '{0}/admin/collections?action=RELOAD&name={1}&wt=json' +REQUEST_STATUS_SOLR_COLLECTION_URL = '{0}/admin/collections?action=REQUESTSTATUS&requestid={1}' CORE_DETAILS_URL = '{0}replication?command=details&wt=json' INFRA_SOLR_CLIENT_BASE_PATH = '/usr/lib/ambari-infra-solr-client/' @@ -818,8 +819,69 @@ def is_logsearch_available(config, service_filter): return 'LOGSEARCH' in service_filter and config.has_section('logsearch_collections') \ and config.has_option('logsearch_collections', 'enabled') and config.get('logsearch_collections', 'enabled') == 'true' -def delete_collection(options, config, collection, solr_urls): - request = DELETE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection) +def monitor_solr_async_request(options, config, status_request, request_id): + request_status_json_cmd=create_solr_api_request_command(status_request, config) + logger.debug("Solr request: {0}".format(status_request)) + async_request_success_msg = "Async Solr request (id: {0}) {1}COMPLETED{2}".format(request_id, colors.OKGREEN, colors.ENDC) + async_request_timeout_msg = "Async Solr request (id: {0}) {1}FAILED{2}".format(request_id, colors.FAIL, colors.ENDC) + async_request_fail_msg = "\nAsync Solr request (id: {0}) {1}TIMED OUT{2} (increase --solr-async-request-tries if required, default is 400)".format(request_id, colors.FAIL, colors.ENDC) + max_tries = options.solr_async_request_tries if options.solr_async_request_tries else 400 + tries = 0 + sys.stdout.write("Start monitoring Solr request with id {0} ...".format(request_id)) + sys.stdout.flush() + async_request_finished = False + async_request_failed = False + async_request_timed_out = False + while not async_request_finished: + tries = tries + 1 + process = Popen(request_status_json_cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = process.communicate() + if process.returncode != 0: + raise Exception("{0} command failed: {1}".format(request_status_json_cmd, str(err))) + else: + response=json.loads(str(out)) + logger.debug(response) + if 'status' in response: + async_state=response['status']['state'] + async_msg=response['status']['msg'] + if async_state == "completed": + async_request_finished = True + sys.stdout.write("\nSolr response message: {0}\n".format(async_msg)) + sys.stdout.flush() + elif async_state == "failed": + async_request_finished = True + async_request_failed = True + sys.stdout.write("\nSolr response message: {0}\n".format(async_msg)) + sys.stdout.flush() + else: + if not options.verbose: + sys.stdout.write(".") + sys.stdout.flush() + logger.debug(str(async_msg)) + logger.debug("Sleep 5 seconds ...") + time.sleep(5) + else: + raise Exception("The 'status' field is missing from the response: {0}".format(response)) + if tries == max_tries: + async_request_finished = True + async_request_timed_out = True + + if async_request_failed: + if async_request_timed_out: + print async_request_timeout_msg + sys.exit(1) + else: + print async_request_fail_msg + sys.exit(1) + else: + print async_request_success_msg + return request_id + + +def delete_collection(options, config, collection, solr_urls, response_data_map): + async_id = str(randint(1000,100000)) + solr_url = get_random_solr_url(solr_urls, options) + request = DELETE_SOLR_COLLECTION_URL.format(solr_url, collection, async_id) logger.debug("Solr request: {0}".format(request)) delete_collection_json_cmd=create_solr_api_request_command(request, config) process = Popen(delete_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True) @@ -827,8 +889,10 @@ def delete_collection(options, config, collection, solr_urls): if process.returncode != 0: raise Exception("{0} command failed: {1}".format(delete_collection_json_cmd, str(err))) response=json.loads(str(out)) - if 'success' in response: - print 'Deleting collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC) + if 'requestid' in response: + print 'Deleting collection {0} request sent. {1}DONE{2}'.format(collection, colors.OKGREEN, colors.ENDC) + response_data_map['request_id']=response['requestid'] + response_data_map['status_request']=REQUEST_STATUS_SOLR_COLLECTION_URL.format(solr_url, response['requestid']) return collection else: raise Exception("DELETE collection ('{0}') failed. Response: {1}".format(collection, str(out))) @@ -980,17 +1044,26 @@ def delete_logsearch_collections(options, config, collections_json_location, col history_collection = config.get('logsearch_collections', 'history_collection_name') if service_logs_collection in collections: solr_urls = get_solr_urls(options, config, service_logs_collection, collections_json_location) - retry(delete_collection, options, config, service_logs_collection, solr_urls, context='[Delete {0} collection]'.format(service_logs_collection)) + response_map={} + retry(delete_collection, options, config, service_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(service_logs_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(service_logs_collection) if audit_logs_collection in collections: solr_urls = get_solr_urls(options, config, audit_logs_collection, collections_json_location) - retry(delete_collection, options, config, audit_logs_collection, solr_urls, context='[Delete {0} collection]'.format(audit_logs_collection)) + response_map={} + retry(delete_collection, options, config, audit_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(audit_logs_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(audit_logs_collection) if history_collection in collections: solr_urls = get_solr_urls(options, config, history_collection, collections_json_location) - retry(delete_collection, options, config, history_collection, solr_urls, context='[Delete {0} collection]'.format(history_collection)) + response_map={} + retry(delete_collection, options, config, history_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(history_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(history_collection) @@ -1000,17 +1073,26 @@ def delete_atlas_collections(options, config, collections_json_location, collect vertex_index_collection = config.get('atlas_collections', 'vertex_index_name') if fulltext_collection in collections: solr_urls = get_solr_urls(options, config, fulltext_collection, collections_json_location) - retry(delete_collection, options, config, fulltext_collection, solr_urls, context='[Delete {0} collection]'.format(fulltext_collection)) + response_map={} + retry(delete_collection, options, config, fulltext_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(fulltext_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(fulltext_collection) if edge_index_collection in collections: solr_urls = get_solr_urls(options, config, edge_index_collection, collections_json_location) - retry(delete_collection, options, config, edge_index_collection, solr_urls, context='[Delete {0} collection]'.format(edge_index_collection)) + response_map={} + retry(delete_collection, options, config, edge_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(edge_index_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(edge_index_collection) if vertex_index_collection in collections: solr_urls = get_solr_urls(options, config, vertex_index_collection, collections_json_location) - retry(delete_collection, options, config, vertex_index_collection, solr_urls, context='[Delete {0} collection]'.format(vertex_index_collection)) + response_map={} + retry(delete_collection, options, config, vertex_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(vertex_index_collection)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(vertex_index_collection) @@ -1018,7 +1100,10 @@ def delete_ranger_collection(options, config, collections_json_location, collect ranger_collection_name = config.get('ranger_collection', 'ranger_collection_name') if ranger_collection_name in collections: solr_urls = get_solr_urls(options, config, ranger_collection_name, collections_json_location) - retry(delete_collection, options, config, ranger_collection_name, solr_urls, context='[Delete {0} collection]'.format(ranger_collection_name)) + response_map={} + retry(delete_collection, options, config, ranger_collection_name, solr_urls, response_map, context='[Delete {0} collection]'.format(ranger_collection_name)) + retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'], + context="[Monitor Solr async request, id: {0}]".format(response_map['request_id'])) else: print 'Collection {0} does not exist or filtered out. Skipping delete operation'.format(ranger_collection_name) @@ -1824,6 +1909,7 @@ if __name__=="__main__": parser.add_option("--ranger-index-location", dest="ranger_index_location", type="string", help="location of the index backups (for ranger). required only if no backup path in the ini file") parser.add_option("--version", dest="index_version", type="string", default="6.6.2", help="lucene index version for migration (6.6.2 or 7.3.1)") + parser.add_option("--solr-async-request-tries", dest="solr_async_request_tries", type="int", default=400, help="number of max tries for async Solr requests (e.g.: delete operation)") parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request") parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request") parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command") |