Commit d560b44c authored by Marko Kollo's avatar Marko Kollo
Browse files

WIP Handling ES timeouts and retries.

parent 0a39d593
import logging
from pprint import pprint
from searcher.dashboard.metafile import BaseDashboardFormater
from typing import Dict, List
from searcher.dashboard.metafile import BaseDashboardFormater
from texta.settings import ERROR_LOGGER
......@@ -39,6 +37,7 @@ class MultiSearchFormater(BaseDashboardFormater):
return final_result
def _format_initial_response(self, response):
"""
Because MultiSearch does not contain a single response, but many it was
......@@ -58,6 +57,7 @@ class MultiSearchFormater(BaseDashboardFormater):
return final_result
def _add_value_count_percentages(self, aggregation_dict: dict, total_document_count: int, field_counts: dict):
"""
Traverses the previously grouped dictionary of ES aggregations, loops through the value_count
......@@ -71,13 +71,14 @@ class MultiSearchFormater(BaseDashboardFormater):
aggregation_dict["value_count"] = {}
for field_name, doc_count in field_counts.items():
percentage = round( doc_count * 100 / total_document_count, 2)
aggregation_dict["value_count"][field_name] = {'doc_count' : doc_count, 'percentage': percentage}
percentage = round(doc_count * 100 / total_document_count, 2)
aggregation_dict["value_count"][field_name] = {'doc_count': doc_count, 'percentage': percentage}
except ZeroDivisionError as e:
logging.getLogger(ERROR_LOGGER).exception(e)
def _format_aggregation_dict(self, agg_dict: dict):
"""
Taking the aggregation results of a single index, format it into the
......@@ -95,6 +96,7 @@ class MultiSearchFormater(BaseDashboardFormater):
if 'nested' not in field_name:
agg_type, field_name, bucket_suffix = field_name.split('#')
else:
agg_type, field_name, bucket_suffix = ('nested', 'texta_facts', '')
......
......@@ -47,40 +47,40 @@ class MultiSearchConductor:
clean_field_name = self._remove_dot_notation(field_name)
search_gateway = elasticsearch_dsl.Search(index=index).using(es)
self.field_counts[field_name] = search_gateway.query("exists", field=clean_field_name).count()
self.field_counts[clean_field_name] = search_gateway.query("exists", field=clean_field_name).count()
# Do not play around with the #, they exist to avoid naming conflicts as awkward as they may be.
# TODO Find a better solution for this.
if field_type == "text":
if query_body is not None:
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket("sigsterms#{0}#text_sigterms".format(field_name), 'significant_text', field=field_name, filter_duplicate_text=True)
search_dsl.aggs.bucket("sigsterms#{0}#text_sigterms".format(clean_field_name), 'significant_text', field=field_name, filter_duplicate_text=True)
self.multi_search = self.multi_search.add(search_dsl)
elif field_type == "keyword":
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket("sterms#{0}#keyword_terms".format(field_name), 'terms', field=field_name)
search_dsl.aggs.bucket("sterms#{0}#keyword_terms".format(clean_field_name), 'terms', field=field_name)
self.multi_search = self.multi_search.add(search_dsl)
elif field_type == "date":
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket("date_histogram#{0}_month#date_month".format(field_name), 'date_histogram', field=field_name, interval='month')
search_dsl.aggs.bucket("date_histogram#{0}_year#date_year".format(field_name), 'date_histogram', field=field_name, interval='year')
search_dsl.aggs.bucket("date_histogram#{0}_month#date_month".format(clean_field_name), 'date_histogram', field=field_name, interval='month')
search_dsl.aggs.bucket("date_histogram#{0}_year#date_year".format(clean_field_name), 'date_histogram', field=field_name, interval='year')
self.multi_search = self.multi_search.add(search_dsl)
elif field_type == "integer":
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket("extended_stats#{0}#int_stats".format(field_name), 'extended_stats', field=field_name)
search_dsl.aggs.bucket("extended_stats#{0}#int_stats".format(clean_field_name), 'extended_stats', field=field_name)
self.multi_search = self.multi_search.add(search_dsl)
elif field_type == "long":
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket('extended_stats#{0}#long_stats'.format(field_name), 'extended_stats', field=field_name)
search_dsl.aggs.bucket('extended_stats#{0}#long_stats'.format(clean_field_name), 'extended_stats', field=field_name)
self.multi_search = self.multi_search.add(search_dsl)
elif field_type == "float":
search_dsl = self._create_search_object(query_body=query_body, index=index, es=es)
search_dsl.aggs.bucket("extended_stats#{0}#float_stats".format(field_name), 'extended_stats', field=field_name)
search_dsl.aggs.bucket("extended_stats#{0}#float_stats".format(clean_field_name), 'extended_stats', field=field_name)
self.multi_search = self.multi_search.add(search_dsl)
def _texta_facts_agg_handler(self, query_body, index, es):
......@@ -106,7 +106,7 @@ class MultiSearchConductor:
:return: Name of the field but the comma removed. ex article_lead
"""
if '.' in field_name:
field_name = field_name.split('.')[0]
field_name = field_name.replace(".keyword", "")
return field_name
else:
return field_name
......
from collections import OrderedDict, defaultdict
import dictor
from utils.highlighter import Highlighter, ColorPicker
from searcher.view_functions.general.searcher_utils import additional_option_cut_text
from searcher.view_functions.build_search.translit_highlighting import hl_transliterately
......@@ -51,12 +54,35 @@ def execute_search(es_m, es_params):
field_path = col.split('.')
# Get content for the fields and make facts human readable
# Possible outcomes for a field:
# Normal field value - covered by dictor.
# Object field value - covered by dictor
# List of normal values - check for list and element type
# List of objects - check for list and dict element, get the key values.
content = hit['_source']
if col == FACT_FIELD and col in hit['_source']:
content = improve_facts_readability(hit['_source'][col])
else:
for p in field_path:
content = content[p] if p in content else ''
# When the value of the field is a normal field or object field.
if dictor.dictor(content, col, default=""):
content = dictor.dictor(content, col, default="")
else:
possible_list = dictor.dictor(content, ".".join(field_path[:-2]), default="")
if isinstance(possible_list, list) and not isinstance(possible_list[0], dict, default=""): # It's a normal list field
content = possible_list
elif isinstance(possible_list, list) and isinstance(possible_list[0], dict): # It's an array of objects.
list_of_items = dictor.dictor(content, ".".join(field_path[:-1]), default="")
dictionary_key = field_path[-1]
values = [item[dictionary_key] for item in list_of_items]
content = "\n".join(values)
else: # Field is missing
content = None
content = str(content)
if strip_html:
......@@ -119,7 +145,7 @@ def _prettify_standardize_hls(name_to_inner_hits, col, content, old_content):
return content, hl_data
def _transliterate(cols_data, row, translit_cols=['text', 'translit', 'lemmas']):
def _transliterate(cols_data, row, translit_cols=['text', 'translit', 'lemmas']):
# To get nested col value before '.'
hl_cols = [x for x in cols_data if len(x.split('.')) > 1 and x.split('.')[-1] in translit_cols]
# Transliterate the highlighting between hl_cols
......
......@@ -100,8 +100,7 @@ def get_fields(es_m):
path_list = path.split('.')
label = '{0} --> {1}'.format(path_list[0], path_list[-1]) if len(path_list) > 1 else path_list[0]
label = label.replace('-->', u'→')
label = u'→'.join(path_list)
if data['type'] == 'date':
data['range'] = get_daterange(es_m, path)
......
......@@ -261,6 +261,6 @@ function swalCustomTypeDisplay(swalType, title, text) {
title: title,
text: text,
heightAuto: false,
type: swalType
type: swalType,
})
}
import sys
from datetime import datetime
import json
import os
......@@ -24,7 +25,7 @@ from task_manager.document_preprocessor import PREPROCESSOR_INSTANCES
class PreprocessorWorker(BaseWorker):
def __init__(self, scroll_size=200, time_out='10m'):
def __init__(self, scroll_size=100, time_out='50m'):
self.es_m = None
self.task_id = None
self.params = None
......@@ -34,12 +35,14 @@ class PreprocessorWorker(BaseWorker):
self._reload_env()
self.info_logger, self.error_logger = self._generate_loggers()
def _reload_env(self):
from dotenv import load_dotenv
from pathlib import Path
env_path = str(Path('.env'))
load_dotenv(dotenv_path=env_path)
def _generate_loggers(self):
import graypy
import os
......@@ -52,6 +55,7 @@ class PreprocessorWorker(BaseWorker):
return info_logger, error_logger
def run(self, task_id):
self.task_id = task_id
task = Task.objects.get(pk=self.task_id)
......@@ -86,6 +90,7 @@ class PreprocessorWorker(BaseWorker):
task.result = json.dumps({'error': repr(e)})
task.update_status(Task.STATUS_FAILED, set_time_completed=True)
def _preprocessor_worker(self):
field_paths = []
show_progress = ShowProgress(self.task_id)
......@@ -132,11 +137,21 @@ class PreprocessorWorker(BaseWorker):
show_progress.update(total_hits)
self.es_m.update_documents_by_id(doc_ids)
# Get next page if any
response = self.es_m.scroll(scroll_id=scroll_id, time_out=self.scroll_time_out)
total_hits = len(response['hits']['hits'])
scroll_id = response['_scroll_id']
# For partial update
doc_ids = [x['_id'] for x in response['hits']['hits'] if '_id' in x]
try:
response = self.es_m.scroll(scroll_id=scroll_id, time_out=self.scroll_time_out)
total_hits = len(response['hits']['hits'])
scroll_id = response['_scroll_id']
# For partial update
doc_ids = [x['_id'] for x in response['hits']['hits'] if '_id' in x]
except KeyError as e:
t, v, tb = sys.exc_info()
self.error_logger.exception(t)
self.error_logger.exception(v)
self.error_logger.exception(tb)
self.error_logger.exception(response)
raise e
task = Task.objects.get(pk=self.task_id)
show_progress.update(100)
......@@ -156,6 +171,7 @@ class PreprocessorWorker(BaseWorker):
task.time_completed = datetime.now()
task.save()
def _prepare_preprocessor_data(self, response: dict):
"""
Seperates document dicts and id strings from the pure ES response and changes
......@@ -178,6 +194,7 @@ class PreprocessorWorker(BaseWorker):
return documents, parameter_dict, ids, document_locations
@staticmethod
def _parse_query(parameters):
"""
......@@ -196,6 +213,7 @@ class PreprocessorWorker(BaseWorker):
query = json.loads(Search.objects.get(pk=int(search)).query)
return query
@staticmethod
def _check_if_request_bad(args):
'''Check if models/fields are selected'''
......
import time
from functools import wraps
from django.http import HttpResponseRedirect
from texta.settings import SERVER_TYPE
......@@ -10,3 +13,52 @@ def HTTPS_ResponseRedirect(request, url):
absolute_url = request.build_absolute_uri(url)
new_url = "https%s" % absolute_url[4:]
return HttpResponseRedirect(new_url)
def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None):
"""Retry calling the decorated function using an exponential backoff.
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
:param ExceptionToCheck: the exception to check. may be a tuple of
exceptions to check
:type ExceptionToCheck: Exception or tuple
:param tries: number of times to try (not retry) before giving up
:type tries: int
:param delay: initial delay between retries in seconds
:type delay: int
:param backoff: backoff multiplier e.g. value of 2 will double the delay
each retry
:type backoff: int
:param logger: logger to use. If None, print
:type logger: logging.Logger instance
"""
def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except ExceptionToCheck as e:
msg = "%s, Retrying in %d seconds..." % (str(e), mdelay)
if logger:
logger.warning(msg)
else:
print(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)
return f_retry # true decorator
return deco_retry
import json
import logging
from time import sleep
from utils.decorators import retry
import requests
from texta.settings import ERROR_LOGGER
......@@ -60,8 +60,9 @@ class Helpers:
class MLPTaskAdapter(object):
CELERY_CHUNK_SIZE = 6
CELERY_CHUNK_SIZE = 10
MAX_NETWORK_RETRY_COUNT = 5
MAX_TASK_RETRY_COUNT = 1000
def __init__(self, mlp_url, mlp_type='mlp'):
self.mlp_url = mlp_url
......@@ -82,36 +83,45 @@ class MLPTaskAdapter(object):
self.errors = {}
@retry(Exception, tries=10, delay=10, backoff=5, logger=logging.getLogger(ERROR_LOGGER))
def _start_mlp_celery_task(self, mlp_input):
"""
Uses the MLP endpoint to trigger a Celery task inside the MLP server.
'url': 'http://localhost:5000/task/status/c2b1119e...', 'task': 'c2b1119e...'}
"""
response = requests.post(self.start_task_url, data=mlp_input, )
response.raise_for_status()
try:
response = requests.post(self.start_task_url, data=mlp_input)
task_info = response.json()
task_info["position_index"] = len(self.tasks)
task_info["retry_count"] = 0
self.tasks.append(task_info)
except Exception as e:
logging.getLogger(ERROR_LOGGER).exception(mlp_input)
logging.getLogger(ERROR_LOGGER).exception("Response Status: {} and Response Content: {}".format(response.status_code, response.text))
raise Exception(e) # Raise it again.
@retry(Exception, tries=10, delay=10, backoff=5, logger=logging.getLogger(ERROR_LOGGER))
def _poll_task_status(self, task_id: str):
"""
Get the state of the celery task using MLP's status endpoint.
This will be good for reporting any retries, errors and successful tasks.
"""
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
response.raise_for_status()
try:
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
result = response.json()
return result
except Exception as e:
logging.getLogger(ERROR_LOGGER).exception(task_id)
logging.getLogger(ERROR_LOGGER).exception("Response Status: {} and Response Content: {}".format(response.status_code, response.text))
raise Exception(e)
def _handle_error_status(self, task_status: dict):
......@@ -154,8 +164,11 @@ class MLPTaskAdapter(object):
elif task_status == "SUCCESS":
self._handle_success_status(task_state, task_state["position_index"])
self.tasks[index]["retry_count"] += 1
# Remove all the tasks that have finished their jobs or failed turning it.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids and task["task"] not in self.failed_task_ids]
sleep(3) # Wait a small amount of time until checking wheter the task has finished.
self.tasks = [task for task in self.tasks if task["retry_count"] < MLPTaskAdapter.MAX_TASK_RETRY_COUNT]
sleep(5) # Wait a small amount of time until checking wheter the task has finished.
return self.analyzation_data, self.errors
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment