summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOliver Szabo <oleewere@gmail.com>2018-07-09 22:44:06 +0200
committerOliver Szabo <oleewere@gmail.com>2018-07-09 22:47:51 +0200
commitb8c2108cad777c4daad716ae11868b9c006592b9 (patch)
tree7bcbd38c1b7e7d8cc1e984badc993e3ccd7b772f
parent219a23623349bc3715f6c667369477bd32ef1b0b (diff)
AMBARI-23945. Infra Solr migration: use async request for collection deletion
-rw-r--r--ambari-infra/ambari-infra-solr-client/README.md3
-rwxr-xr-xambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py112
2 files changed, 102 insertions, 13 deletions
diff --git a/ambari-infra/ambari-infra-solr-client/README.md b/ambari-infra/ambari-infra-solr-client/README.md
index b4c4614ac5..a14f92a881 100644
--- a/ambari-infra/ambari-infra-solr-client/README.md
+++ b/ambari-infra/ambari-infra-solr-client/README.md
@@ -853,6 +853,9 @@ Options:
only if no backup path in the ini file
--version=INDEX_VERSION
lucene index version for migration (6.6.2 or 7.3.1)
+ --solr-async-request-tries=SOLR_ASYNC_REQUEST_TRIES
+ number of max tries for async Solr requests (e.g.:
+ delete operation)
--request-tries=REQUEST_TRIES
number of tries for BACKUP/RESTORE status api calls in
the request
diff --git a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
index 56ab9ad97e..1842333df0 100755
--- a/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
+++ b/ambari-infra/ambari-infra-solr-client/src/main/python/migrationHelper.py
@@ -34,7 +34,7 @@ import ConfigParser
import solrDataManager as solr_data_manager
from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, randint
from subprocess import Popen, PIPE
HTTP_PROTOCOL = 'http'
@@ -68,8 +68,9 @@ CREATE_CONFIGURATIONS_URL = '/configurations'
LIST_SOLR_COLLECTION_URL = '{0}/admin/collections?action=LIST&wt=json'
CREATE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=CREATE&name={1}&collection.configName={2}&numShards={3}&replicationFactor={4}&maxShardsPerNode={5}&wt=json'
-DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json'
+DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json&async={2}'
RELOAD_SOLR_COLLECTION_URL = '{0}/admin/collections?action=RELOAD&name={1}&wt=json'
+REQUEST_STATUS_SOLR_COLLECTION_URL = '{0}/admin/collections?action=REQUESTSTATUS&requestid={1}'
CORE_DETAILS_URL = '{0}replication?command=details&wt=json'
INFRA_SOLR_CLIENT_BASE_PATH = '/usr/lib/ambari-infra-solr-client/'
@@ -818,8 +819,69 @@ def is_logsearch_available(config, service_filter):
return 'LOGSEARCH' in service_filter and config.has_section('logsearch_collections') \
and config.has_option('logsearch_collections', 'enabled') and config.get('logsearch_collections', 'enabled') == 'true'
-def delete_collection(options, config, collection, solr_urls):
- request = DELETE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection)
+def monitor_solr_async_request(options, config, status_request, request_id):
+ request_status_json_cmd=create_solr_api_request_command(status_request, config)
+ logger.debug("Solr request: {0}".format(status_request))
+ async_request_success_msg = "Async Solr request (id: {0}) {1}COMPLETED{2}".format(request_id, colors.OKGREEN, colors.ENDC)
+ async_request_timeout_msg = "Async Solr request (id: {0}) {1}FAILED{2}".format(request_id, colors.FAIL, colors.ENDC)
+ async_request_fail_msg = "\nAsync Solr request (id: {0}) {1}TIMED OUT{2} (increase --solr-async-request-tries if required, default is 400)".format(request_id, colors.FAIL, colors.ENDC)
+ max_tries = options.solr_async_request_tries if options.solr_async_request_tries else 400
+ tries = 0
+ sys.stdout.write("Start monitoring Solr request with id {0} ...".format(request_id))
+ sys.stdout.flush()
+ async_request_finished = False
+ async_request_failed = False
+ async_request_timed_out = False
+ while not async_request_finished:
+ tries = tries + 1
+ process = Popen(request_status_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
+ out, err = process.communicate()
+ if process.returncode != 0:
+ raise Exception("{0} command failed: {1}".format(request_status_json_cmd, str(err)))
+ else:
+ response=json.loads(str(out))
+ logger.debug(response)
+ if 'status' in response:
+ async_state=response['status']['state']
+ async_msg=response['status']['msg']
+ if async_state == "completed":
+ async_request_finished = True
+ sys.stdout.write("\nSolr response message: {0}\n".format(async_msg))
+ sys.stdout.flush()
+ elif async_state == "failed":
+ async_request_finished = True
+ async_request_failed = True
+ sys.stdout.write("\nSolr response message: {0}\n".format(async_msg))
+ sys.stdout.flush()
+ else:
+ if not options.verbose:
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ logger.debug(str(async_msg))
+ logger.debug("Sleep 5 seconds ...")
+ time.sleep(5)
+ else:
+ raise Exception("The 'status' field is missing from the response: {0}".format(response))
+ if tries == max_tries:
+ async_request_finished = True
+ async_request_timed_out = True
+
+ if async_request_failed:
+ if async_request_timed_out:
+ print async_request_timeout_msg
+ sys.exit(1)
+ else:
+ print async_request_fail_msg
+ sys.exit(1)
+ else:
+ print async_request_success_msg
+ return request_id
+
+
+def delete_collection(options, config, collection, solr_urls, response_data_map):
+ async_id = str(randint(1000,100000))
+ solr_url = get_random_solr_url(solr_urls, options)
+ request = DELETE_SOLR_COLLECTION_URL.format(solr_url, collection, async_id)
logger.debug("Solr request: {0}".format(request))
delete_collection_json_cmd=create_solr_api_request_command(request, config)
process = Popen(delete_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
@@ -827,8 +889,10 @@ def delete_collection(options, config, collection, solr_urls):
if process.returncode != 0:
raise Exception("{0} command failed: {1}".format(delete_collection_json_cmd, str(err)))
response=json.loads(str(out))
- if 'success' in response:
- print 'Deleting collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+ if 'requestid' in response:
+ print 'Deleting collection {0} request sent. {1}DONE{2}'.format(collection, colors.OKGREEN, colors.ENDC)
+ response_data_map['request_id']=response['requestid']
+ response_data_map['status_request']=REQUEST_STATUS_SOLR_COLLECTION_URL.format(solr_url, response['requestid'])
return collection
else:
raise Exception("DELETE collection ('{0}') failed. Response: {1}".format(collection, str(out)))
@@ -980,17 +1044,26 @@ def delete_logsearch_collections(options, config, collections_json_location, col
history_collection = config.get('logsearch_collections', 'history_collection_name')
if service_logs_collection in collections:
solr_urls = get_solr_urls(options, config, service_logs_collection, collections_json_location)
- retry(delete_collection, options, config, service_logs_collection, solr_urls, context='[Delete {0} collection]'.format(service_logs_collection))
+ response_map={}
+ retry(delete_collection, options, config, service_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(service_logs_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(service_logs_collection)
if audit_logs_collection in collections:
solr_urls = get_solr_urls(options, config, audit_logs_collection, collections_json_location)
- retry(delete_collection, options, config, audit_logs_collection, solr_urls, context='[Delete {0} collection]'.format(audit_logs_collection))
+ response_map={}
+ retry(delete_collection, options, config, audit_logs_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(audit_logs_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(audit_logs_collection)
if history_collection in collections:
solr_urls = get_solr_urls(options, config, history_collection, collections_json_location)
- retry(delete_collection, options, config, history_collection, solr_urls, context='[Delete {0} collection]'.format(history_collection))
+ response_map={}
+ retry(delete_collection, options, config, history_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(history_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(history_collection)
@@ -1000,17 +1073,26 @@ def delete_atlas_collections(options, config, collections_json_location, collect
vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
if fulltext_collection in collections:
solr_urls = get_solr_urls(options, config, fulltext_collection, collections_json_location)
- retry(delete_collection, options, config, fulltext_collection, solr_urls, context='[Delete {0} collection]'.format(fulltext_collection))
+ response_map={}
+ retry(delete_collection, options, config, fulltext_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(fulltext_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(fulltext_collection)
if edge_index_collection in collections:
solr_urls = get_solr_urls(options, config, edge_index_collection, collections_json_location)
- retry(delete_collection, options, config, edge_index_collection, solr_urls, context='[Delete {0} collection]'.format(edge_index_collection))
+ response_map={}
+ retry(delete_collection, options, config, edge_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(edge_index_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(edge_index_collection)
if vertex_index_collection in collections:
solr_urls = get_solr_urls(options, config, vertex_index_collection, collections_json_location)
- retry(delete_collection, options, config, vertex_index_collection, solr_urls, context='[Delete {0} collection]'.format(vertex_index_collection))
+ response_map={}
+ retry(delete_collection, options, config, vertex_index_collection, solr_urls, response_map, context='[Delete {0} collection]'.format(vertex_index_collection))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(vertex_index_collection)
@@ -1018,7 +1100,10 @@ def delete_ranger_collection(options, config, collections_json_location, collect
ranger_collection_name = config.get('ranger_collection', 'ranger_collection_name')
if ranger_collection_name in collections:
solr_urls = get_solr_urls(options, config, ranger_collection_name, collections_json_location)
- retry(delete_collection, options, config, ranger_collection_name, solr_urls, context='[Delete {0} collection]'.format(ranger_collection_name))
+ response_map={}
+ retry(delete_collection, options, config, ranger_collection_name, solr_urls, response_map, context='[Delete {0} collection]'.format(ranger_collection_name))
+ retry(monitor_solr_async_request, options, config, response_map['status_request'], response_map['request_id'],
+ context="[Monitor Solr async request, id: {0}]".format(response_map['request_id']))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation'.format(ranger_collection_name)
@@ -1824,6 +1909,7 @@ if __name__=="__main__":
parser.add_option("--ranger-index-location", dest="ranger_index_location", type="string", help="location of the index backups (for ranger). required only if no backup path in the ini file")
parser.add_option("--version", dest="index_version", type="string", default="6.6.2", help="lucene index version for migration (6.6.2 or 7.3.1)")
+ parser.add_option("--solr-async-request-tries", dest="solr_async_request_tries", type="int", default=400, help="number of max tries for async Solr requests (e.g.: delete operation)")
parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")