Commit d274dd0a authored by mkollo's avatar mkollo
Browse files

Stability changes to MLP adapter

parent 5fd1cd60
......@@ -52,6 +52,7 @@ class MLPTaskAdapter(object):
# Intermediary task management.
self.tasks = []
self.finished_task_ids = []
self.failed_task_ids = []
# Final return values.
self.analyzation_data = []
......@@ -74,20 +75,18 @@ class MLPTaskAdapter(object):
Get the state of the celery task using MLP's status endpoint.
This will be good for reporting any retries, errors and successful tasks.
url = self.task_status_url.format(self.mlp_url, task_id)
response = requests.get(url).json()
return response
url = self.task_status_url.format(self.mlp_url.strip("/"), task_id)
response = requests.get(url)
print(url, response, response.text)
return response.json()
def _handle_pending_status(self, task_index: int):
def _handle_error_status(self, task_status: dict):
def _handle_error_status(self, task_index: int):
def _handle_success_status(self, task_state):
def _handle_success_status(self, task_state: dict):
result = task_state["result"]
self.parsed_document_count += len(result)
......@@ -116,17 +115,15 @@ class MLPTaskAdapter(object):
for index, task_state in enumerate(task_states):
task_status = task_state["status"]
if task_status == "PENDING":
elif task_status == "FAILURE":
if task_status == "FAILURE":
elif task_status == "SUCCESS":
# Remove all the tasks that have finished their jobs.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids]
# Remove all the tasks that have finished their jobs or failed turning it.
self.tasks = [task for task in self.tasks if task["task"] not in self.finished_task_ids and task["task"] not in self.failed_task_ids]
sleep(3) # Wait a small amount of time until checking wheter the task has finished.
return self.analyzation_data, self.errors
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment