Commit a382f201 authored by Marko Kollo's avatar Marko Kollo
Browse files

WIP Celery support without error management.

parent 4f2716f2
......@@ -13,6 +13,10 @@ class MlpPreprocessor(object):
"""Preprocessor implementation for running TEXTA Multilingual Processor (MLP) on the selected documents.
def _reload_env(self):
import dotenv
def __init__(self, mlp_url=None, enabled_features=['text', 'lang', 'texta_facts']):
:param mlp_url: full URL to the MLP instance. Must be accessible.
......@@ -22,6 +26,7 @@ class MlpPreprocessor(object):
self._mlp_url = mlp_url
self._enabled_features = set(enabled_features)
def transform(self, documents, **kwargs):
"""Takes input documents and enhances them with MLP output.
from dateutil import parser
from time import sleep
import requests
import logging
import json
from utils.mlp_task_adapter import MLPTaskAdapter
class MLPLitePreprocessor(object):
Cleans texts for classification. Lemmatizes text.
def _reload_env(self):
import dotenv
def __init__(self, mlp_url=None):
self.mlp_url = mlp_url
def _process_stats(stats):
......@@ -34,7 +36,7 @@ class MLPLitePreprocessor(object):
processed_stats[stat_key] = str(len(str(stat_val)))
processed_stats[stat_key] = stat_val
return processed_stats
def transform(self, documents, **kwargs):
......@@ -69,9 +71,9 @@ class MLPLitePreprocessor(object):
analyzation_data, errors = MLPTaskAdapter(self.mlp_url, mlp_type='mlp_lite').process(data)
for analyzation_idx, analyzation_datum in enumerate(analyzation_data):
documents[analyzation_idx][input_feature+'_mlp-lite'] = {}
documents[analyzation_idx][input_feature+'_mlp-lite']['text'] = analyzation_datum['text']
documents[analyzation_idx][input_feature + '_mlp-lite'] = {}
documents[analyzation_idx][input_feature + '_mlp-lite']['text'] = analyzation_datum['text']
if output_type == 'full':
documents[analyzation_idx][input_feature+'_mlp-lite']['stats'] = self._process_stats(analyzation_datum['stats'])
documents[analyzation_idx][input_feature + '_mlp-lite']['stats'] = self._process_stats(analyzation_datum['stats'])
return {'documents': documents, 'meta': {}, 'erros': errors}
......@@ -118,6 +118,7 @@ class PreprocessorWorker(BaseWorker):
if documents:
if FACT_FIELD not in documents[0]:
self.es_m.update_mapping_structure(FACT_FIELD, FACT_PROPERTIES)
# Apply all preprocessors
for preprocessor_code in parameter_dict['preprocessors']:
preprocessor = PREPROCESSOR_INSTANCES[preprocessor_code]
......@@ -3,57 +3,118 @@ import requests
import logging
import json
class Helpers:
def chunks(iterator: list, chunk_size=6):
Divides all the items in a list into equal chunks.
for i in range(0, len(iterator), chunk_size):
yield iterator[i:i + chunk_size]
def divide_tasks_into_chunks(data: dict):
Split all the documents inside the dictionary into equally sized objects
to make the best use out of Celery's multiple workers. All of the gained
chunks will be thrown into the Celery queue.
tasks_data = []
list_of_texts = json.loads(data["texts"])
chunks = Helpers.chunks(list_of_texts)
for chunk in chunks:
input_data = {"texts": json.dumps(chunk)}
return tasks_data
class MLPTaskAdapter(object):
def __init__(self, mlp_url, mlp_type='mlp'):
self.mlp_url = mlp_url
self.start_task_url = '{0}/task/start/{1}'.format(mlp_url.strip('/'), mlp_type)
self.task_status_url = "{0}/task/status/{1}"
self.tasks = []
self.analyzation_data = []
self.errors = {}
def _start_mlp_celery_task(self, mlp_input):
Uses the MLP endpoint to trigger a Celery task inside the MLP server.
response =, data=mlp_input)
task_info = response.json()
# {'url': 'http://localhost:5000/task/status/c2b1119e...', 'task': 'c2b1119e...'}
def _poll_task_status(self, task_id: str):
Use the tasks is to get the overall state of the celery task using MLP's
status endpoint. This will be good for reporting any retries, errors and successful
url = self.task_status_url.format(self.mlp_url, task_id)
response = requests.get(url).json()
return response
def _handle_pending_status(self, task_index: int):
print("Task is still pending")
def _handle_error_status(self, task_index: int):
print("Task has failed!")
def process(self, data):
errors = {}
current_fail_count = 0
status_code = 500
while status_code == 500:
started_task =, data=data)
started_task_json = started_task.json()
current_status_code = int(started_task.status_code)
if current_status_code == 500:
current_fail_count += 1
if current_fail_count > 3:
task_status_text = 'FAILURE'
task_status = {'status': task_status_text}
status_code = 404
status_code = current_status_code
task_status_text = 'PENDING'
current_fail_count = 0
while task_status_text == 'PENDING':
pending_task = requests.get(started_task_json['url'])
pending_task_json = pending_task.json()
task_status_text = pending_task_json['status']
task_status = pending_task_json
current_fail_count = 0
current_fail_count += 1
# if fail to fetch result 3 times, declare task failed
if current_fail_count > 3:
task_status = {'status': 'failed to parse response'}
task_status_text = 'FAILURE'
logging.error('Task failed because correct response not sent from MLP')
logging.error('Failed to parse MLP response. Trying again ({})...'.format(current_fail_count))
analyzation_data = []
if task_status_text == 'FAILURE':
errors = {'task_failed': task_status}
logging.error('Failed to analyze text with MLP Lite', extra={'url':self.start_task_url, 'texts': json.dumps(data, ensure_ascii=False)})
elif task_status_text == 'SUCCESS':
analyzation_data = task_status['result']
return analyzation_data, errors
# Split all the documents into chunk, each chunk becomes a SEPARATE Celery task.
celery_task_chunk = Helpers.divide_tasks_into_chunks(data)
# For each previously split chunk, start a separate Celery task.
for celery_input in celery_task_chunk:
# As long as there are active tasks being processed, poll their status.
# If one fails or succeeds, they are removed.
while self.tasks:
# Get all the states at once to avoid unnecessary delays.
task_states = [self._poll_task_status(task["task"]) for task in self.tasks]
# Rout all the Celery task results to their respective handlers.
for index, task_state in enumerate(task_states):
if task_state["status"] == "PENDING":
elif task_state["status"] == "FAILURE":
elif task_state["status"] == "SUCCESS":
del self.tasks[index]
sleep(3) # Wait a small amount of time until checking wheter the task has finished.
return self.analyzation_data, self.errors
if __name__ == '__main__':
data_for_processing = [
"Hello there", "general kenobi", "are you interested", "in some free real estate?",
"Hello there", "general kenobi", "are you interested", "in some free real estate?",
"Hello there", "general kenobi", "are you interested", "in some free real estate?",
"Hello there", "general kenobi", "are you interested", "in some free real estate?"
mlp = MLPTaskAdapter("http://localhost:12000")
result_data, errors = mlp.process({"texts": json.dumps(data_for_processing)})
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment