Commit c862a43c authored by Marko Kollo's avatar Marko Kollo
Browse files

Added splitting data into multiple Celery tasks.

parent a382f201
......@@ -54,8 +54,10 @@ class MlpPreprocessor(object):
for analyzation_idx, analyzation_datum in enumerate(analyzation_data):
analyzation_datum = analyzation_datum[0]
documents[analyzation_idx][input_feature+'_mlp'] = analyzation_datum['text']
documents[analyzation_idx][input_feature+'_mlp']['lang'] = analyzation_datum['text']['lang']
if 'texta_facts' not in documents[analyzation_idx]:
documents[analyzation_idx]['texta_facts'] = []
documents[analyzation_idx]['texta_facts'].extend(analyzation_datum['texta_facts'])
......
from time import sleep
import requests
import logging
import json
......@@ -15,6 +14,12 @@ class Helpers:
yield iterator[i:i + chunk_size]
@staticmethod
def pars_data_string(data: dict):
data = json.loads(data["texts"])
return data
@staticmethod
def divide_tasks_into_chunks(data: dict):
"""
......@@ -40,7 +45,15 @@ class MLPTaskAdapter(object):
self.start_task_url = '{0}/task/start/{1}'.format(mlp_url.strip('/'), mlp_type)
self.task_status_url = "{0}/task/status/{1}"
# Progress management.
self.total_document_count = 0
self.parsed_document_count = 0
# Intermediary task management.
self.tasks = []
self.finished_task_ids = []
# Final return values.
self.analyzation_data = []
self.errors = {}
......@@ -58,9 +71,8 @@ class MLPTaskAdapter(object):
def _poll_task_status(self, task_id: str):
"""
Use the tasks is to get the overall state of the celery task using MLP's
status endpoint. This will be good for reporting any retries, errors and successful
tasks.
Get the state of the celery task using MLP's status endpoint.
This will be good for reporting any retries, errors and successful tasks.
"""
url = self.task_status_url.format(self.mlp_url, task_id)
response = requests.get(url).json()
......@@ -68,14 +80,23 @@ class MLPTaskAdapter(object):
def _handle_pending_status(self, task_index: int):
print("Task is still pending")
pass
def _handle_error_status(self, task_index: int):
print("Task has failed!")
pass
def _handle_success_status(self, task_state):
result = task_state["result"]
self.parsed_document_count += len(result)
self.analyzation_data.extend(result)
self.finished_task_ids.append(task_state["id"])
def process(self, data):
self.total_document_count = len(Helpers.pars_data_string(data))
# Split all the documents into chunk, each chunk becomes a SEPARATE Celery task.
celery_task_chunk = Helpers.divide_tasks_into_chunks(data)
......@@ -93,16 +114,19 @@ class MLPTaskAdapter(object):
# Rout all the Celery task results to their respective handlers.
for index, task_state in enumerate(task_states):
if task_state["status"] == "PENDING":
task_status = task_state["status"]
if task_status == "PENDING":
self._handle_pending_status(index)
elif task_state["status"] == "FAILURE":
elif task_status == "FAILURE":
self._handle_error_status(index)
elif task_state["status"] == "SUCCESS":
self.analyzation_data.append(task_state["result"])
del self.tasks[index]
elif task_status == "SUCCESS":
self._handle_success_status(task_state)
# Remove all the tasks that have finished their jobs.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids]
sleep(3) # Wait a small amount of time until checking wheter the task has finished.
return self.analyzation_data, self.errors
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment