Commit 1841a376 authored by Marko Kollo's avatar Marko Kollo
Browse files

Fixed multi-index, out-of-sync fields and more.

parent 5fd1cd60
# -*- coding: utf-8 -*-
import logging
import requests
from requests.exceptions import ConnectionError, Timeout
import logging
import json
from utils.mlp_task_adapter import MLPTaskAdapter
from utils.mlp_task_adapter import MLPTaskAdapter, Helpers
class MlpPreprocessor(object):
"""Preprocessor implementation for running TEXTA Multilingual Processor (MLP) on the selected documents.
def _reload_env(self):
import dotenv
def __init__(self, mlp_url=None, enabled_features=['text', 'lang', 'texta_facts']):
:param mlp_url: full URL to the MLP instance. Must be accessible.
......@@ -28,6 +26,7 @@ class MlpPreprocessor(object):
self._enabled_features = set(enabled_features)
def transform(self, documents, **kwargs):
"""Takes input documents and enhances them with MLP output.
......@@ -47,19 +46,37 @@ class MlpPreprocessor(object):
for input_feature in input_features:
texts = [document[input_feature] for document in documents if input_feature in document]
data = {'texts': json.dumps(texts, ensure_ascii=False), 'doc_path': input_feature+'_mlp'}
feature_path = input_feature.split(".")
if len(feature_path) > 1:
texts = [Helpers.traverse_nested_dict_by_keys(document, feature_path) for document in documents]
texts = [document[input_feature] for document in documents if input_feature in document]
data = {'texts': json.dumps(texts, ensure_ascii=False), 'doc_path': input_feature + '_mlp'}
analyzation_data, errors = MLPTaskAdapter(self._mlp_url, mlp_type='mlp').process(data)
for analyzation_idx, analyzation_datum in enumerate(analyzation_data):
analyzation_datum = analyzation_datum[0]
documents[analyzation_idx][input_feature+'_mlp'] = analyzation_datum['text']
documents[analyzation_idx][input_feature+'_mlp']['lang'] = analyzation_datum['text']['lang']
input_feature_path = input_feature.split(".")
if len(input_feature) == 1:
documents[analyzation_idx][input_feature + '_mlp'] = analyzation_datum['text']
documents[analyzation_idx][input_feature + '_mlp']['lang'] = analyzation_datum['text']['lang']
if 'texta_facts' not in documents[analyzation_idx]:
documents[analyzation_idx]['texta_facts'] = []
# Make sure the last field is used as the path.
mlp_field_path = input_feature_path[:-1] + [input_feature_path[-1] + "_mlp"]
Helpers.set_in_dict(documents[analyzation_idx], mlp_field_path, analyzation_datum['text'])
lang_path = mlp_field_path + ["lang"]
Helpers.set_in_dict(documents[analyzation_idx], lang_path, analyzation_datum['text']['lang'])
if 'texta_facts' not in documents[analyzation_idx]:
documents[analyzation_idx]["texta_facts"] = []
if 'texta_facts' not in documents[analyzation_idx]:
documents[analyzation_idx]['texta_facts'] = []
return {'documents': documents, 'meta': {}, 'errors': errors}
......@@ -95,20 +95,31 @@ class ES_Manager:
return response
def _get_wildcard_index_names(self, wildcarded_string: str) -> List[str]:
Takes in a wildcarded string and returns a list of indices that matches
the pattern. The alias endpoint was chosen because it returns the least amount of data
for the network traffic.
:return: List of indices that matches pattern
url = "{}/{}/_alias".format(self.es_url, wildcarded_string)
response = requests.get(url=url).json()
return response.keys()
def update_mapping_structure(self, new_field, new_field_properties):
url = '{0}/{1}/_mappings/'.format(self.es_url, self.stringify_datasets())
response = self.plain_get(url)
for index in self.stringify_datasets().split(','):
for mapping in response[index]['mappings'].keys():
properties = response[index]['mappings'][mapping]['properties']
get_response = self.plain_get(url)
for index in self._get_wildcard_index_names(self.stringify_datasets()):
for mapping in get_response[index]['mappings'].keys():
properties = get_response[index]['mappings'][mapping]['properties']
if new_field not in properties:
properties[new_field] = new_field_properties
properties = {'properties': properties}
url = '{0}/{1}/_mapping/{2}'.format(self.es_url, index, mapping)
response = self.plain_put(url, json.dumps(properties))
put_response = self.plain_put(url, json.dumps(properties))
def update_documents(self):
response = self.plain_post(
......@@ -135,7 +146,7 @@ class ES_Manager:
sub_mapping = [{'path': k, 'type': 'fact'}]
# deal with object & nested structures
# deal with object & nested structures
elif 'properties' in v and k not in self.TEXTA_RESERVED:
sub_structure = v['properties']
......@@ -757,6 +768,7 @@ class ES_Manager:
return normal_fields, nested_fields
def is_field_text_field(field_name, index_name):
text_types = ["text", "keyword"]
from time import sleep
import requests
from functools import reduce # forward compatibility for Python 3
import operator
import json
......@@ -14,6 +16,16 @@ class Helpers:
yield iterator[i:i + chunk_size]
@staticmethod # Set a given data in a dictionary with position provided as a list
def set_in_dict(data_dict, map_list, value):
Helpers.traverse_nested_dict_by_keys(data_dict, map_list[:-1])[map_list[-1]] = value
def traverse_nested_dict_by_keys(data_dict, keys):
return reduce(operator.getitem, keys, data_dict)
def pars_data_string(data: dict):
data = json.loads(data["texts"])
......@@ -21,7 +33,7 @@ class Helpers:
def divide_tasks_into_chunks(data: dict):
def divide_tasks_into_chunks(data: dict, chunk_size=6):
Split all the documents inside the dictionary into equally sized objects
to make the best use out of Celery's multiple workers. All of the gained
......@@ -29,16 +41,18 @@ class Helpers:
tasks_data = []
list_of_texts = json.loads(data["texts"])
chunks = Helpers.chunks(list_of_texts)
chunks = Helpers.chunks(list_of_texts, chunk_size=chunk_size)
for chunk in chunks:
input_data = {"texts": json.dumps(chunk)}
input_data = {"texts": json.dumps(chunk, ensure_ascii=False), "doc_path": data["doc_path"]}
return tasks_data
class MLPTaskAdapter(object):
def __init__(self, mlp_url, mlp_type='mlp'):
self.mlp_url = mlp_url
......@@ -52,6 +66,7 @@ class MLPTaskAdapter(object):
# Intermediary task management.
self.tasks = []
self.finished_task_ids = []
self.failed_task_ids = []
# Final return values.
self.analyzation_data = []
......@@ -66,6 +81,7 @@ class MLPTaskAdapter(object):
task_info = response.json()
# {'url': 'http://localhost:5000/task/status/c2b1119e...', 'task': 'c2b1119e...'}
task_info["position_index"] = len(self.tasks)
......@@ -74,32 +90,30 @@ class MLPTaskAdapter(object):
Get the state of the celery task using MLP's status endpoint.
This will be good for reporting any retries, errors and successful tasks.
url = self.task_status_url.format(self.mlp_url, task_id)
response = requests.get(url).json()
return response
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
return response.json()
def _handle_pending_status(self, task_index: int):
def _handle_error_status(self, task_status: dict):
def _handle_error_status(self, task_index: int):
def _handle_success_status(self, task_state: dict, task_index: int):
result = task_state["status"]["result"]["result"]
def _handle_success_status(self, task_state):
result = task_state["result"]
for index_int, list_of_text_dicts in enumerate(result):
self.analyzation_data[task_index * MLPTaskAdapter.CELERY_CHUNK_SIZE + index_int] = list_of_text_dicts
self.parsed_document_count += len(result)
def process(self, data):
self.total_document_count = len(Helpers.pars_data_string(data))
# Split all the documents into chunk, each chunk becomes a SEPARATE Celery task.
celery_task_chunk = Helpers.divide_tasks_into_chunks(data)
celery_task_chunk = Helpers.divide_tasks_into_chunks(data, chunk_size=MLPTaskAdapter.CELERY_CHUNK_SIZE)
self.analyzation_data = [None] * len(Helpers.pars_data_string(data))
# For each previously split chunk, start a separate Celery task.
for celery_input in celery_task_chunk:
......@@ -110,23 +124,20 @@ class MLPTaskAdapter(object):
while self.tasks:
# Get all the states at once to avoid unnecessary delays.
task_states = [self._poll_task_status(task["task"]) for task in self.tasks]
task_states = [{"status": self._poll_task_status(task["task"]), "position_index": task["position_index"]} for task in self.tasks]
# Rout all the Celery task results to their respective handlers.
for index, task_state in enumerate(task_states):
task_status = task_state["status"]["status"]
task_status = task_state["status"]
if task_status == "PENDING":
elif task_status == "FAILURE":
if task_status == "FAILURE":
elif task_status == "SUCCESS":
self._handle_success_status(task_state, task_state["position_index"])
# Remove all the tasks that have finished their jobs.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids]
# Remove all the tasks that have finished their jobs or failed turning it.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids and task["task"] not in self.failed_task_ids]
sleep(3) # Wait a small amount of time until checking wheter the task has finished.
return self.analyzation_data, self.errors
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment