Commit 0216b675 authored by Marko Kollo's avatar Marko Kollo
Browse files

Added empty field handling to MLP and some logging to the instable bits.

parent d3e1929e
# -*- coding: utf-8 -*-
import json
import logging
from texta.settings import ERROR_LOGGER
from utils.mlp_task_adapter import MLPTaskAdapter, Helpers
......@@ -50,13 +52,17 @@ class MlpPreprocessor(object):
if len(feature_path) > 1:
texts = [Helpers.traverse_nested_dict_by_keys(document, feature_path) for document in documents]
else:
texts = [document[input_feature] for document in documents if input_feature in document]
texts = [document[input_feature] if input_feature in document else "" for document in documents]
data = {'texts': json.dumps(texts, ensure_ascii=False), 'doc_path': input_feature + '_mlp'}
analyzation_data, errors = MLPTaskAdapter(self._mlp_url, mlp_type='mlp').process(data)
for analyzation_idx, analyzation_datum in enumerate(analyzation_data):
analyzation_datum = analyzation_datum[0]
# This part is under a try catch because it's an notorious trouble maker.
try:
analyzation_datum = analyzation_datum[0]
except Exception as e:
logging.getLogger(ERROR_LOGGER).exception(analyzation_datum)
input_feature_path = input_feature.split(".")
if len(input_feature) == 1:
......
import json
import logging
from time import sleep
import requests
from functools import reduce # forward compatibility for Python 3
import operator
import json
from texta.settings import ERROR_LOGGER
class Helpers:
......@@ -18,12 +20,19 @@ class Helpers:
@staticmethod # Set a given data in a dictionary with position provided as a list
def set_in_dict(data_dict, map_list, value):
Helpers.traverse_nested_dict_by_keys(data_dict, map_list[:-1])[map_list[-1]] = value
for key in map_list[:-1]:
data_dict = data_dict.setdefault(key, {})
data_dict[map_list[-1]] = value
@staticmethod
def traverse_nested_dict_by_keys(data_dict, keys):
return reduce(operator.getitem, keys, data_dict)
for k in keys:
data_dict = data_dict.get(k, "")
if data_dict == "":
return ""
return data_dict
@staticmethod
......@@ -76,13 +85,17 @@ class MLPTaskAdapter(object):
def _start_mlp_celery_task(self, mlp_input):
"""
Uses the MLP endpoint to trigger a Celery task inside the MLP server.
'url': 'http://localhost:5000/task/status/c2b1119e...', 'task': 'c2b1119e...'}
"""
response = requests.post(self.start_task_url, data=mlp_input)
task_info = response.json()
try:
response = requests.post(self.start_task_url, data=mlp_input)
task_info = response.json()
task_info["position_index"] = len(self.tasks)
self.tasks.append(task_info)
# {'url': 'http://localhost:5000/task/status/c2b1119e...', 'task': 'c2b1119e...'}
task_info["position_index"] = len(self.tasks)
self.tasks.append(task_info)
except Exception as e:
logging.getLogger(ERROR_LOGGER).exception("Response Status: {} and Response Content: {}".format(response.status_code, response.text))
def _poll_task_status(self, task_id: str):
......@@ -90,10 +103,13 @@ class MLPTaskAdapter(object):
Get the state of the celery task using MLP's status endpoint.
This will be good for reporting any retries, errors and successful tasks.
"""
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
return response.json()
try:
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
result = response.json()
return result
except Exception as e:
logging.getLogger(ERROR_LOGGER).exception("Response Status: {} and Response Content: {}".format(response.status_code, response.text))
def _handle_error_status(self, task_status: dict):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment