es_manager.py 32.2 KB
Newer Older
ranetp's avatar
init  
ranetp committed
1
# -*- coding: utf8 -*-
2
import datetime
ranetp's avatar
init  
ranetp committed
3
import json
Marko Kollo's avatar
Marko Kollo committed
4
import logging
ranetp's avatar
ranetp committed
5
from functools import reduce
6
from typing import Dict, List
7

8
9
import elasticsearch
import requests
Marko Kollo's avatar
Marko Kollo committed
10
from elasticsearch import Elasticsearch, ElasticsearchException
11
from elasticsearch_dsl import A, Search
12
from elasticsearch_dsl.query import MoreLikeThis, Q
Marko Kollo's avatar
Marko Kollo committed
13

14
from permission_admin.models import Dataset
15
from texta.settings import ERROR_LOGGER, FACT_FIELD, date_format, es_ldap_password, es_ldap_user, es_prefix, es_url, es_use_ldap
16
from utils.ds_importer_helper import check_for_analyzer
Unknown's avatar
Unknown committed
17
from utils.query_builder import QueryBuilder
ranetp's avatar
init  
ranetp committed
18
19

# Need to update index.max_inner_result_window to increase
ranetp's avatar
ranetp committed
20
HEADERS = {'Content-Type': 'application/json'}
ranetp's avatar
init  
ranetp committed
21

22

ranetp's avatar
init  
ranetp committed
23
24
25
26
class ES_Manager:
    """ Manage Elasticsearch operations and interface
    """

ranetp's avatar
ranetp committed
27
    HEADERS = HEADERS
28
    TEXTA_RESERVED = [FACT_FIELD]
ranetp's avatar
merge    
ranetp committed
29
    TEXTA_META_FIELDS = ['_es_id']
ranetp's avatar
init  
ranetp committed
30
31
32
33
34
35
36
    # Redefine requests if LDAP authentication is used
    if es_use_ldap:
        requests = requests.Session()
        requests.auth = (es_ldap_user, es_ldap_password)
    else:
        requests = requests

rsirel's avatar
init    
rsirel committed
37
    def __init__(self, active_datasets, url=None):
ranetp's avatar
init  
ranetp committed
38
        self.es_url = url if url else es_url
rsirel's avatar
rsirel committed
39
        self.active_datasets = active_datasets
ranetp's avatar
init  
ranetp committed
40
41
        self.combined_query = None
        self._facts_map = None
Unknown's avatar
Unknown committed
42
        self.clear_readonly_block()
43

44
45
46
47
48
49
    def stringify_datasets(self) -> str:
        """
        Return a comma separated string of index names for
        Elasticsearch multi-index operations.
        :return:
        """
rsirel's avatar
rsirel committed
50
51
52
        indices = [dataset.index for dataset in self.active_datasets]
        index_string = ','.join(indices)
        return index_string
ranetp's avatar
init  
ranetp committed
53

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    @staticmethod
    def get_analyzers():
        ELASTICSEARCH_ANALYZERS = [
            {"display_name": "Standard Analyzer", "analyzer": "standard"},
            {"display_name": "Whitespace Analyzer", "analyzer": "whitespace"},
            {"display_name": "Pattern Analyzer", "analyzer": "pattern"},
            {"display_name": "Simple Analyzer", "analyzer": "simple"},
            {"display_name": "Stop Analyzer", "analyzer": "stop"},
            {"display_name": "Keyword Analyzer", "analyzer": "keyword"},
            {"display_name": "Fingerprint Analyzer", "analyzer": "fingerprint"},
        ]

        estonian_analyzer = check_for_analyzer(display_name="Estonian Analyzer", analyzer_name="estonian", es_url=es_url)
        if estonian_analyzer: ELASTICSEARCH_ANALYZERS.append(estonian_analyzer)
        return ELASTICSEARCH_ANALYZERS

ranetp's avatar
ranetp committed
70
    def bulk_post_update_documents(self, documents, ids):
ranetp's avatar
ranetp committed
71
        """Do both plain_post_bulk and update_documents()"""
Unknown's avatar
Unknown committed
72
        data = ''
Carlos Bentes's avatar
Carlos Bentes committed
73
74

        for i, _id in enumerate(ids):
75
            data += json.dumps({"update": {"_id": _id, "_index": self.stringify_datasets()}}) + '\n'
Carlos Bentes's avatar
Carlos Bentes committed
76
77
            data += json.dumps({"doc": documents[i]}) + '\n'

Unknown's avatar
Unknown committed
78
        response = self.plain_post_bulk(self.es_url, data)
ranetp's avatar
ranetp committed
79
        response = self.update_documents()
Unknown's avatar
Unknown committed
80
81
        return response

Raul Sirel's avatar
Raul Sirel committed
82
    def bulk_post_documents(self, documents, ids, document_locations):
83
        """Do just plain_post_bulk"""
ranetp's avatar
ranetp committed
84
        data = ''
85

86
        for i, _id in enumerate(ids):
87
            data += json.dumps({"update": {"_id": _id, "_index": document_locations[i]['_index'], "_type": document_locations[i]['_type']}}) + '\n'
88
            data += json.dumps({"doc": documents[i]}) + '\n'
89

ranetp's avatar
ranetp committed
90
91
92
93
        response = self.plain_post_bulk(self.es_url, data)

        return response

94
95
96
97
98
99
100
101
102
103
104
    def _get_wildcard_index_names(self, wildcarded_string: str) -> List[str]:
        """
        Takes in a wildcarded string and returns a list of indices that matches
        the pattern. The alias endpoint was chosen because it returns the least amount of data
        for the network traffic.
        :return: List of indices that matches pattern
        """
        url = "{}/{}/_alias".format(self.es_url, wildcarded_string)
        response = requests.get(url=url).json()
        return response.keys()

Unknown's avatar
Unknown committed
105
    def update_mapping_structure(self, new_field, new_field_properties):
106
        url = '{0}/{1}/_mappings/'.format(self.es_url, self.stringify_datasets())
107
108
109
110
111
        get_response = self.plain_get(url)

        for index in self._get_wildcard_index_names(self.stringify_datasets()):
            for mapping in get_response[index]['mappings'].keys():
                properties = get_response[index]['mappings'][mapping]['properties']
Carlos Bentes's avatar
Carlos Bentes committed
112

113
114
                if new_field not in properties:
                    properties[new_field] = new_field_properties
Carlos Bentes's avatar
Carlos Bentes committed
115

116
                properties = {'properties': properties}
117
                url = '{0}/{1}/_mapping/{2}'.format(self.es_url, index, mapping)
118
                put_response = self.plain_put(url, json.dumps(properties))
ranetp's avatar
init  
ranetp committed
119

ranetp's avatar
ranetp committed
120
    def update_documents(self):
121
122
        response = self.plain_post(
            '{0}/{1}/_update_by_query?refresh&conflicts=proceed'.format(self.es_url, self.stringify_datasets()))
Unknown's avatar
Unknown committed
123
124
        return response

125
    def update_documents_by_id(self, ids: List[str]):
126
127
128
        query = json.dumps({"query": {"terms": {"_id": ids}}})
        response = self.plain_post(
            '{0}/{1}/_update_by_query?conflicts=proceed'.format(self.es_url, self.stringify_datasets()), data=query)
129
130
        return response

131
    def _decode_mapping_structure(self, structure, root_path=list(), nested_layers=list()):
ranetp's avatar
init  
ranetp committed
132
133
134
        """ Decode mapping structure (nested dictionary) to a flat structure
        """
        mapping_data = []
135

Marko Kollo's avatar
Marko Kollo committed
136
        for k, v in structure.items():
rsirel's avatar
rsirel committed
137
138
139
            # deal with fact field
            if 'properties' in v and k in self.TEXTA_RESERVED:
                sub_structure = v['properties']
ranetp's avatar
init  
ranetp committed
140
                path_list = root_path[:]
rsirel's avatar
rsirel committed
141
                path_list.append(k)
ranetp's avatar
ranetp committed
142
                sub_mapping = [{'path': k, 'type': 'fact'}]
ranetp's avatar
init  
ranetp committed
143
144
                mapping_data.extend(sub_mapping)

145
            # deal with object & nested structures
rsirel's avatar
rsirel committed
146
147
            elif 'properties' in v and k not in self.TEXTA_RESERVED:
                sub_structure = v['properties']
148
149
150
151
152
153
154

                # add layer path
                nested_layers_updated = nested_layers[:]
                if 'type' in v:
                    if v['type'] == 'nested':
                        nested_layers_updated.append(k)

ranetp's avatar
init  
ranetp committed
155
                path_list = root_path[:]
rsirel's avatar
rsirel committed
156
                path_list.append(k)
157
158
                sub_mapping = self._decode_mapping_structure(sub_structure, root_path=path_list,
                                                             nested_layers=nested_layers_updated)
ranetp's avatar
ranetp committed
159
                mapping_data.extend(sub_mapping)
160

ranetp's avatar
init  
ranetp committed
161
162
            else:
                path_list = root_path[:]
rsirel's avatar
rsirel committed
163
                path_list.append(k)
ranetp's avatar
init  
ranetp committed
164
                path = '.'.join(path_list)
165
                data = {'path': path, 'type': v['type'], 'nested_layers': nested_layers}
ranetp's avatar
init  
ranetp committed
166
167
168
169
170
                mapping_data.append(data)

        return mapping_data

    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
171
    def plain_get(url) -> dict:
ranetp's avatar
ranetp committed
172
173
        return ES_Manager.requests.get(url, headers=HEADERS).json()

ranetp's avatar
init  
ranetp committed
174
    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
175
    def plain_post(url, data=None) -> dict:
ranetp's avatar
ranetp committed
176
        return ES_Manager.requests.post(url, data=data, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
177
178

    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
179
    def plain_post_bulk(url, data) -> dict:
ranetp's avatar
ranetp committed
180
181
        return ES_Manager.requests.post('{0}/_bulk'.format(url), data=data, headers=HEADERS).json()

ranetp's avatar
init  
ranetp committed
182
    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
183
    def plain_put(url, data=None) -> dict:
ranetp's avatar
ranetp committed
184
185
        return ES_Manager.requests.put(url, data=data, headers=HEADERS).json()

ranetp's avatar
init  
ranetp committed
186
    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
187
    def plain_delete(url, data=None) -> dict:
ranetp's avatar
ranetp committed
188
189
        return ES_Manager.requests.delete(url, data=data, headers=HEADERS).json()

ranetp's avatar
init  
ranetp committed
190
    @staticmethod
Marko Kollo's avatar
Marko Kollo committed
191
    def plain_search(es_url, datasets, query) -> dict:
192
193
        return ES_Manager.requests.post(es_url + '/' + datasets + '/_search', data=json.dumps(query),
                                        headers=HEADERS).json()
194

ranetp's avatar
init  
ranetp committed
195
    @staticmethod
rsirel's avatar
rsirel committed
196
    def plain_multisearch(es_url, data):
Marko Kollo's avatar
Marko Kollo committed
197
        responses = ES_Manager.requests.post(es_url + '/_msearch', data='\n'.join(data) + '\n', headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
198
199
200
201
        if 'responses' in responses:
            return responses['responses']
        else:
            return []
ranetp's avatar
ranetp committed
202

ranetp's avatar
init  
ranetp committed
203
204
    @staticmethod
    def plain_scroll(es_url, dataset, mapping, query, expiration_str='1m'):
Carlos Bentes's avatar
Carlos Bentes committed
205
206
        url = es_url + '/' + dataset + '/' + mapping + '/_search?scroll=' + expiration_str
        return ES_Manager.requests.post(url, data=query, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
207

rsirel's avatar
rsirel committed
208
209
210
211
212
213
    @staticmethod
    def delete_index(index):
        url = '{0}/{1}'.format(es_url, index)
        ES_Manager.requests.delete(url, headers=HEADERS)
        return True

214
215
216
217
218
219
    @staticmethod
    def clear_scroll(scroll_id):
        url = '{0}/_search/scroll'.format(es_url)
        query = json.dumps({"scroll_id": scroll_id})
        return ES_Manager.requests.delete(url, data=query,headers=HEADERS)

rsirel's avatar
rsirel committed
220
221
    def get_fields_with_facts(self):
        queries = []
222

Marko Kollo's avatar
Marko Kollo committed
223
224
        fact_types_with_queries = {
            'fact': {'match_all': {}},
225
226
227
228
            'fact_str': {'nested': {'path': FACT_FIELD, 'query': {'exists': {'field': '{}.str_val'.format(FACT_FIELD)}},
                                    'inner_hits': {}}},
            'fact_num': {'nested': {'path': FACT_FIELD, 'query': {'exists': {'field': '{}.num_val'.format(FACT_FIELD)}},
                                    'inner_hits': {}}}
Marko Kollo's avatar
Marko Kollo committed
229
        }
230

Marko Kollo's avatar
Marko Kollo committed
231
        for fact_type, query in fact_types_with_queries.items():
rsirel's avatar
rsirel committed
232
            for active_dataset in self.active_datasets:
Marko Kollo's avatar
Marko Kollo committed
233
234
                aggs = {
                    fact_type: {
235
                        "nested": {"path": FACT_FIELD},
Marko Kollo's avatar
Marko Kollo committed
236
237
                        "aggs": {
                            fact_type: {
238
239
                                'terms': {"field": "{}.doc_path".format(FACT_FIELD),
                                          'order': {'documents.doc_count': 'desc'}},
Marko Kollo's avatar
Marko Kollo committed
240
241
                                "aggs": {
                                    "documents": {"reverse_nested": {}}
rsirel's avatar
rsirel committed
242
243
                                }
                            }
rsirel's avatar
rsirel committed
244
245
246
                        }
                    }
                }
247

rsirel's avatar
rsirel committed
248
249
250
251
                query_header = {'index': active_dataset.index, 'mapping': active_dataset.mapping}
                query_body = {'query': query, 'aggs': aggs}
                queries.append(json.dumps(query_header))
                queries.append(json.dumps(query_body))
252

rsirel's avatar
rsirel committed
253
254
        responses = self.plain_multisearch(es_url, queries)
        fields_with_facts = {'fact': [], 'fact_str': [], 'fact_num': []}
255

rsirel's avatar
rsirel committed
256
        for response in responses:
rsirel's avatar
rsirel committed
257
258
259
260
261
262
263
264
            if 'aggregations' in response:
                aggregations = response['aggregations']
                for fact_type in list(fields_with_facts.keys()):
                    if fact_type in aggregations:
                        second_agg = aggregations[fact_type]
                        if fact_type in second_agg:
                            buckets = second_agg[fact_type]['buckets']
                            fields_with_facts[fact_type] += [a['key'] for a in buckets]
265

rsirel's avatar
rsirel committed
266
267
268
269
270
271
272
273
        return fields_with_facts

    @staticmethod
    def _parse_buckets(response, key):
        fact_count = response['aggregations'][key]
        if key in fact_count:
            return [bucket['key'] for bucket in fact_count[key]['buckets']]
        return []
274

275
276
277
278
279
280
281
282
283
284
285
286
287
288
    @staticmethod
    def handle_composition_aggregation(search: Search, aggregation_dict: dict, after: dict):
        s = Search().from_dict(search).using(Elasticsearch(es_url))
        sources = aggregation_dict["sources"]
        size = aggregation_dict.get("size", 10)

        aggregations = [{source["bucket_name"]: A(source["agg_type"], field="{}.keyword".format(source["field"]))} for source in sources]
        if after:
            s.aggs.bucket(aggregation_dict["bucket_name"], "composite", size=size, sources=aggregations, after=after)
            return s
        else:
            s.aggs.bucket(aggregation_dict["bucket_name"], "composite", size=size, sources=aggregations)
            return s

Marko Kollo's avatar
Marko Kollo committed
289
    @staticmethod
290
    def more_like_this(elastic_url, fields: list, like: list, size: int, filters: list, aggregations: list, include: bool, if_agg_only: bool, dataset: Dataset, return_fields=None):
291
        # Create the base query creator and unite with ES gateway.
292
        search = Search(using=Elasticsearch(elastic_url)).index(dataset.index).doc_type(dataset.mapping)
293
        mlt = MoreLikeThis(like=like, fields=fields, min_term_freq=1, max_query_terms=12, include=include)  # Prepare the MLT part of the query.
Marko Kollo's avatar
Marko Kollo committed
294

295
296
297
298
299
300
301
302
303
304
305
        paginated_search = search[0:size]  # Set how many documents to return.
        limited_search = paginated_search.source(return_fields) if return_fields else paginated_search  # If added, choose which FIELDS to return.
        finished_search = limited_search.query(mlt)  # Add the premade MLT into the query.

        # Apply all the user-set filters, if they didn't add any this value will be [] and it quits.
        for filter_dict in filters:
            finished_search = finished_search.filter(Q(filter_dict))

        # Apply all the user-set aggregations, if they didn't add any this value will be [] and it quits.
        for aggregation_dict in aggregations:
            # aggs.bucket() does not return a Search object but changes it instead.
306
307
308
309
310
311
312
313
            if aggregation_dict["agg_type"] == "composite":
                after = aggregation_dict.get("after_key", None)
                finished_search = ES_Manager.handle_composition_aggregation(finished_search.to_dict(), aggregation_dict, after)
            else:
                field_name = aggregation_dict["field"]
                index = like[0]["_index"]
                field = "{}.keyword".format(field_name) if ES_Manager.is_field_text_field(field_name=field_name, index_name=index) else field_name
                finished_search.aggs.bucket(name=aggregation_dict["bucket_name"], agg_type=aggregation_dict["agg_type"], field=field)
314
315
316
317
318
319

        # Choose if you want to return only the aggregations in {"bucket_name": {results...}} format.
        if if_agg_only:
            finished_search = finished_search.params(size=0)
            response = finished_search.execute()
            return response.aggs.to_dict()
Marko Kollo's avatar
Marko Kollo committed
320
321
322

        try:
            response = finished_search.execute()
323
324
325
            result = {"hits": [hit.to_dict() for hit in response]}  # Throw out all metadata and keep only the documents.
            if response.aggs: result.update({"aggregations": response.aggs.to_dict()})  # IF the aggregation query returned anything, THEN add the "aggregatons" key with results.
            return result
Marko Kollo's avatar
Marko Kollo committed
326
327
328
329

        except ElasticsearchException as e:
            logging.getLogger(ERROR_LOGGER).exception(e)
            return {"elasticsearch": [str(e)]}
rsirel's avatar
rsirel committed
330

ranetp's avatar
init  
ranetp committed
331
    def get_mapped_fields(self):
rsirel's avatar
rsirel committed
332
        """ Get flat structure of fields from Elasticsearch mappings
ranetp's avatar
init  
ranetp committed
333
        """
rsirel's avatar
rsirel committed
334
        mapping_data = {}
335

rsirel's avatar
rsirel committed
336
        if self.active_datasets:
337
            index_string = self.stringify_datasets()
Marko Kollo's avatar
Marko Kollo committed
338
            url = '{0}/{1}'.format(es_url, index_string)
339

Marko Kollo's avatar
Marko Kollo committed
340
            for index_name, index_properties in self.plain_get(url).items():
rsirel's avatar
rsirel committed
341
342
                for mapping in index_properties['mappings']:
                    mapping_structure = index_properties['mappings'][mapping]['properties']
rsirel's avatar
rsirel committed
343
344
345
346
347
                    decoded_mapping_structure = self._decode_mapping_structure(mapping_structure)
                    for field_mapping in decoded_mapping_structure:
                        field_mapping_json = json.dumps(field_mapping)
                        if field_mapping_json not in mapping_data:
                            mapping_data[field_mapping_json] = []
348

Marko Kollo's avatar
Marko Kollo committed
349
                        dataset_info = {'index': index_name, 'mapping': mapping}
rsirel's avatar
rsirel committed
350
                        if dataset_info not in mapping_data[field_mapping_json]:
Marko Kollo's avatar
Marko Kollo committed
351
                            mapping_data[field_mapping_json].append({'index': index_name, 'mapping': mapping})
ranetp's avatar
init  
ranetp committed
352
353
354

        return mapping_data

355
    def get_column_names(self, facts=False) -> list:
ranetp's avatar
init  
ranetp committed
356
357
358
359
        """ Get Column names from flat mapping structure
            Returns: sorted list of names
        """
        mapped_fields = self.get_mapped_fields()
rsirel's avatar
rsirel committed
360
        mapped_fields = [json.loads(field_data) for field_data in list(mapped_fields.keys())]
ranetp's avatar
ranetp committed
361
        if facts:
Marko Kollo's avatar
Marko Kollo committed
362
            column_names = [c['path'] for c in mapped_fields]
ranetp's avatar
ranetp committed
363
364
        else:
            column_names = [c['path'] for c in mapped_fields if not self._is_reserved_field(c['path'])]
ranetp's avatar
ranetp committed
365
        # Add meta fields
ranetp's avatar
init  
ranetp committed
366
        column_names.sort()
ranetp's avatar
ranetp committed
367
        column_names += self.TEXTA_META_FIELDS
ranetp's avatar
init  
ranetp committed
368
369
370
371
372
373
374
375
376
377
378
379
        return column_names

    def _is_reserved_field(self, field_name):
        """ Check if a field is a TEXTA reserved name
        """
        reserved = False
        for r in self.TEXTA_RESERVED:
            if r in field_name:
                reserved = True
        return reserved

    def build(self, es_params):
Unknown's avatar
Unknown committed
380
        self.combined_query = QueryBuilder(es_params).query
ranetp's avatar
init  
ranetp committed
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431

    def get_combined_query(self):
        return self.combined_query

    def load_combined_query(self, combined_query):
        self.combined_query = combined_query

    def set_query_parameter(self, key, value):
        """ Set query[key] = value in the main query structure
        """
        self.combined_query['main'][key] = value

    def _check_if_qmain_is_empty(self):
        _must = len(self.combined_query['main']["query"]["bool"]["must"])
        _should = len(self.combined_query['main']["query"]["bool"]["should"])
        _must_not = len(self.combined_query['main']["query"]["bool"]["must_not"])
        return _must == 0 and _should == 0 and _must_not == 0

    def _check_if_qfacts_is_empty(self):
        _include = self.combined_query['facts']['total_include']
        _exclude = self.combined_query['facts']['total_exclude']
        return _include == 0 and _exclude == 0

    def is_combined_query_empty(self):
        _empty_facts = self._check_if_qmain_is_empty()
        _empty_main = self._check_if_qfacts_is_empty()
        return _empty_facts and _empty_main

    @staticmethod
    def _merge_maps(temp_map_list, union=False):
        final_map = {}
        key_set_list = [set(m.keys()) for m in temp_map_list]
        if union:
            intersection_set = reduce(lambda a, b: a | b, key_set_list)
        else:
            intersection_set = reduce(lambda a, b: a & b, key_set_list)
        # Merge all maps:
        for k in intersection_set:
            for m in temp_map_list:
                if k not in final_map:
                    final_map[k] = {}
                for sub_k in m[k]:
                    if sub_k not in final_map[k]:
                        final_map[k][sub_k] = []
                    final_map[k][sub_k].extend(m[k][sub_k])
        return final_map

    def search(self):
        """ Search
        """
        q = json.dumps(self.combined_query['main'])
rsirel's avatar
rsirel committed
432
        search_url = '{0}/{1}/_search'.format(es_url, self.stringify_datasets())
rsirel's avatar
rsirel committed
433
        response = self.plain_post(search_url, q)
ranetp's avatar
init  
ranetp committed
434
435
        return response

Carlos Bentes's avatar
Carlos Bentes committed
436
    def process_bulk(self, hits):
ranetp's avatar
init  
ranetp committed
437
438
        data = ''
        for hit in hits:
Marko Kollo's avatar
Marko Kollo committed
439
            data += json.dumps({"delete": {"_index": self.stringify_datasets(), "_id": hit['_id']}}) + '\n'
ranetp's avatar
init  
ranetp committed
440
441
442
443
444
445
446
        return data

    def delete(self, time_out='1m'):
        """ Deletes the selected rows
        """

        q = json.dumps(self.combined_query['main'])
447
        search_url = '{0}/{1}/_search?scroll={2}'.format(es_url, self.stringify_datasets(), time_out)
ranetp's avatar
ranetp committed
448
        response = requests.post(search_url, data=q, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
449
450

        scroll_id = response['_scroll_id']
Carlos Bentes's avatar
Carlos Bentes committed
451
        total_hits = response['hits']['total']
452
453
454

        # Delete initial response
        data = self.process_bulk(response['hits']['hits'])
455
        delete_url = '{0}/{1}/_bulk'.format(es_url, self.stringify_datasets())
ranetp's avatar
ranetp committed
456
        deleted = requests.post(delete_url, data=data, headers=HEADERS)
Carlos Bentes's avatar
Carlos Bentes committed
457
458

        while total_hits > 0:
459
            response = self.scroll(scroll_id=scroll_id, time_out=time_out)
Carlos Bentes's avatar
Carlos Bentes committed
460
            total_hits = len(response['hits']['hits'])
ranetp's avatar
init  
ranetp committed
461
462
            scroll_id = response['_scroll_id']
            data = self.process_bulk(response['hits']['hits'])
463
            delete_url = '{0}/{1}/_bulk'.format(es_url, self.stringify_datasets())
ranetp's avatar
ranetp committed
464
            deleted = requests.post(delete_url, data=data, headers=HEADERS)
ranetp's avatar
init  
ranetp committed
465
        return True
466

Raul Sirel's avatar
updates    
Raul Sirel committed
467
468
469
    def add_document(self, document):
        """ Indexes given json document
        """
Raul Sirel's avatar
Raul Sirel committed
470
        document = json.dumps(document)
Raul Sirel's avatar
updates    
Raul Sirel committed
471
472
        url = '{0}/{1}/{2}/'.format(es_url, self.index, self.mapping)
        response = self.plain_post(url, data=document)
ranetp's avatar
init  
ranetp committed
473
474
475
476
477
478
479
480
481
        return True

    def scroll(self, scroll_id=None, time_out='1m', id_scroll=False, field_scroll=False, size=100, match_all=False):
        """ Search and Scroll
        """
        if scroll_id:
            q = json.dumps({"scroll": time_out, "scroll_id": scroll_id})
            search_url = '{0}/_search/scroll'.format(es_url)
        else:
Carlos Bentes's avatar
Carlos Bentes committed
482
            if match_all is True:
ranetp's avatar
init  
ranetp committed
483
484
485
486
                q = {}
            else:
                q = self.combined_query['main']
            q['size'] = size
rsirel's avatar
rsirel committed
487
            search_url = '{0}/{1}/_search?scroll={2}'.format(es_url, self.stringify_datasets(), time_out)
488

ranetp's avatar
init  
ranetp committed
489
490
491
492
493
494
495
            if id_scroll:
                q['_source'] = 'false'
            elif field_scroll:
                q['_source'] = field_scroll

            q = json.dumps(q)

ranetp's avatar
ranetp committed
496
        response = self.requests.post(search_url, data=q, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
497
498
499
500
        return response

    def get_total_documents(self):
        q = self.combined_query['main']
Marko Kollo's avatar
Marko Kollo committed
501
502
        response = self.plain_search(es_url=es_url, datasets=self.stringify_datasets(), query=q)
        total = response['hits']['total']
503
        return int(total)
ranetp's avatar
init  
ranetp committed
504
505
506
507

    @staticmethod
    def get_indices():
        url = '{0}/_cat/indices?format=json'.format(es_url)
ranetp's avatar
ranetp committed
508
        response = ES_Manager.requests.get(url, headers=HEADERS).json()
509
510
511
        indices = sorted(
            [{'index': i['index'], 'status': i['status'], 'docs_count': i['docs.count'], 'store_size': i['store.size']}
             for i in response], key=lambda k: k['index'])
512

rsirel's avatar
rsirel committed
513
514
515
516
        # Filter according to prefix
        if es_prefix:
            indices = [i for i in indices if i['index'].startswith(es_prefix)]
        return indices
ranetp's avatar
ranetp committed
517

ranetp's avatar
init  
ranetp committed
518
519
520
    @staticmethod
    def get_mappings(index):
        url = '{0}/{1}'.format(es_url, index)
ranetp's avatar
ranetp committed
521
522
        response = ES_Manager.requests.get(url, headers=HEADERS).json()

ranetp's avatar
init  
ranetp committed
523
524
525
526
527
        return sorted([mapping for mapping in response[index]['mappings']])

    @staticmethod
    def open_index(index):
        url = '{0}/{1}/_open'.format(es_url, index)
ranetp's avatar
ranetp committed
528
        response = ES_Manager.requests.post(url, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
529
530
531
532
533
        return response

    @staticmethod
    def close_index(index):
        url = '{0}/{1}/_close'.format(es_url, index)
ranetp's avatar
ranetp committed
534
        response = ES_Manager.requests.post(url, headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
535
        return response
ranetp's avatar
ranetp committed
536

ranetp's avatar
init  
ranetp committed
537
538
539
540
    def merge_combined_query_with_query_dict(self, query_dict):
        """ Merge the current query with the provided query
            Merges the dictonaries entry-wise and uses conjunction in boolean queries, where needed. Alters the current query in place.
        """
ranetp's avatar
ranetp committed
541

ranetp's avatar
init  
ranetp committed
542
543
544
545
        try:
            query_dict['main']['query']['bool']
        except:
            raise Exception('Incompatible queries.')
ranetp's avatar
ranetp committed
546

ranetp's avatar
init  
ranetp committed
547
548
549
550
551
552
        if 'must' in query_dict['main']['query']['bool'] and query_dict['main']['query']['bool']['must']:
            for constraint in query_dict['main']['query']['bool']['must']:
                self.combined_query['main']['query']['bool']['must'].append(constraint)
        if 'should' in query_dict['main']['query']['bool'] and query_dict['main']['query']['bool']['should']:
            if len(query_dict['main']['query']['bool']['should']) > 1:
                target_list = []
Marko Kollo's avatar
Marko Kollo committed
553
                self.combined_query['main']['query']['bool']['must'].append({'or': target_list})
ranetp's avatar
init  
ranetp committed
554
555
556
557
558
559
560
561
            else:
                target_list = self.combined_query['main']['query']['bool']['must']
            for constraint in query_dict['main']['query']['bool']['should']:
                target_list.append(constraint)
        if 'must_not' in query_dict['main']['query']['bool'] and query_dict['main']['query']['bool']['must_not']:
            for constraint in query_dict['main']['query']['bool']['must_not']:
                self.combined_query['main']['query']['bool']['must_not'].append(constraint)

562
    def more_like_this_search(self, fields, stopwords=[], docs_accepted=[], docs_rejected=[], handle_negatives='ignore', search_size=10, build_search_query=None):
ranetp's avatar
init  
ranetp committed
563
564

        # Get ids from basic search
Gert Paimla's avatar
Gert Paimla committed
565
        self.combined_query['main'] = build_search_query
566
        docs_search = self._scroll_doc_ids()
ranetp's avatar
init  
ranetp committed
567
        # Combine ids from basic search and mlt search
568

rsirel's avatar
rsirel committed
569
570
        docs_search = [json.dumps(d) for d in docs_search]
        docs_accepted = [json.dumps(d) for d in docs_accepted]
571

rsirel's avatar
rsirel committed
572
        docs_combined = list(set().union(docs_search, docs_accepted))
573

rsirel's avatar
rsirel committed
574
        docs_combined = [json.loads(d) for d in docs_search]
ranetp's avatar
init  
ranetp committed
575
576
577

        mlt = {
            "more_like_this": {
Marko Kollo's avatar
Marko Kollo committed
578
579
580
581
                "fields": fields,
                "like": docs_combined,
                "min_term_freq": 1,
                "max_query_terms": 12,
ranetp's avatar
init  
ranetp committed
582
583
584
585
586
587
588
589
590
591
592
            }
        }

        if stopwords:
            mlt["more_like_this"]["stop_words"] = stopwords

        highlight_fields = {}
        for field in fields:
            highlight_fields[field] = {}

        query = {
Carlos Bentes's avatar
Carlos Bentes committed
593
594
595
            "query": {
                "bool": {
                    "must": [mlt]
ranetp's avatar
init  
ranetp committed
596
597
                }
            },
Carlos Bentes's avatar
Carlos Bentes committed
598
            "size": search_size,
Carlos Bentes's avatar
Carlos Bentes committed
599
600
601
602
            "highlight": {
                "pre_tags": ["<b>"],
                "post_tags": ["</b>"],
                "fields": highlight_fields
ranetp's avatar
init  
ranetp committed
603
604
605
606
607
608
609
            }
        }

        if docs_rejected:
            if handle_negatives == 'unlike':
                mlt["more_like_this"]["unlike"] = self._add_doc_ids_to_query(docs_rejected)
            elif handle_negatives == 'ignore':
Carlos Bentes's avatar
Carlos Bentes committed
610
                rejected = [{'ids': {'values': docs_rejected}}]
ranetp's avatar
init  
ranetp committed
611
612
                query["query"]["bool"]["must_not"] = rejected

rsirel's avatar
rsirel committed
613
        response = ES_Manager.plain_search(self.es_url, self.stringify_datasets(), query)
ranetp's avatar
ranetp committed
614

ranetp's avatar
init  
ranetp committed
615
616
        return response

Carlos Bentes's avatar
Carlos Bentes committed
617
    def _add_doc_ids_to_query(self, ids):
618
        return [{"_index": self.stringify_datasets(), "_id": id} for id in ids]
ranetp's avatar
init  
ranetp committed
619

Carlos Bentes's avatar
Carlos Bentes committed
620
    def _scroll_doc_ids(self, limit=500):
ranetp's avatar
init  
ranetp committed
621
        ids = []
ranetp's avatar
ranetp committed
622

ranetp's avatar
init  
ranetp committed
623
624
        response = self.scroll(id_scroll=True, size=100)
        scroll_id = response['_scroll_id']
gpaimla's avatar
gpaimla committed
625
626
627
628
629

        for hit in response['hits']['hits']:
            ids.append({'_index': hit['_index'], '_type': hit['_type'], '_id': hit['_id']})
            if len(ids) == limit:
                return ids
ranetp's avatar
init  
ranetp committed
630
631

        return ids
632

rsirel's avatar
rsirel committed
633
634
    def perform_queries(self, queries):
        response = ES_Manager.plain_multisearch(self.es_url, queries)
ranetp's avatar
ranetp committed
635
        return response
ranetp's avatar
init  
ranetp committed
636

637
    def perform_query(self, query):
Marko Kollo's avatar
Marko Kollo committed
638
639
        response = ES_Manager.plain_search(self.es_url, self.stringify_datasets(), query)
        return response
640

rsirel's avatar
rsirel committed
641
    def get_extreme_dates(self, field):
642
643
        query = {"aggs": {"max_date": {"max": {"field": field}},
                          "min_date": {"min": {"field": field, 'format': 'yyyy-MM-dd'}}}}
rsirel's avatar
rsirel committed
644
        url = "{0}/{1}/_search".format(self.es_url, self.stringify_datasets())
ranetp's avatar
ranetp committed
645
        response = requests.post(url, data=json.dumps(query), headers=HEADERS).json()
ranetp's avatar
init  
ranetp committed
646
        aggs = response["aggregations"]
rsirel's avatar
rsirel committed
647

rsirel's avatar
rsirel committed
648
649
        _min = self._timestamp_to_str(aggs["min_date"]["value"])
        _max = self._timestamp_to_str(aggs["max_date"]["value"])
650

rsirel's avatar
rsirel committed
651
652
653
654
        return _min, _max

    @staticmethod
    def _timestamp_to_str(timestamp):
Marko Kollo's avatar
Marko Kollo committed
655
        date_object = datetime.date.fromtimestamp(timestamp / 1000)
rsirel's avatar
rsirel committed
656
657
        return datetime.date.strftime(date_object, date_format)

658
    def clear_readonly_block(self):
659
        '''changes the property read_only_allow_delete of an index to False'''
660
        data = {"index": {"blocks": {"read_only_allow_delete": "false"}}}
661
        url = "{0}/{1}/_settings".format(self.es_url, self.stringify_datasets())
662
663
        response = self.plain_put(url, json.dumps(data))
        return response
664

Marko Kollo's avatar
Marko Kollo committed
665
    def get_mapping_schema(self) -> dict:
666
667
668
669
670
        """
        Using the _mapping endpoint of Elasticsearch, returns the mapping dictionary
        of all the indices specified. Supports multi-index.
        :return: Mappings of the doc_types.
        """
671
672
673
674
        endpoint_url = '{0}/{1}/_mapping'.format(es_url, self.stringify_datasets())
        response = self.plain_get(endpoint_url)
        return response

Marko Kollo's avatar
Marko Kollo committed
675
    def get_document_count(self, query: dict) -> int:
676
677
678
679
680
681
682
683
684
685
        """
        Returns how many documents are returned from a query using Elasticsearch's
        _count API.
        :param query: Dictionary of the search query send to Elasticsearch.
        :return: Integer of the amount of documents that correspond to the search query.
        """
        endpoint_url = '{0}/{1}/_count'.format(es_url, self.stringify_datasets())
        response = ES_Manager.requests.get(url=endpoint_url, json=query).json()
        return response['count']

686
    @staticmethod
687
688
689
690
691
    def single_index_count(index_name: str):
        es = Elasticsearch(es_url)
        status = es.cat.indices(index=index_name, h="status").strip()
        if status == "open":
            count = Search(using=es, index=index_name).count()
mkollo's avatar
mkollo committed
692
            return count
693
        else:
mkollo's avatar
mkollo committed
694
695
            return None

696

697
698
699
700
701
702
703
704
705
706
707
    def get_field_mappings(self) -> dict:
        """
        Uses the _mapping endpoint to fetch the mapping data of ALL the fields in
        the specified indices. This includes Elasticsearch's built-in values like "_id" and "_source".
        :return:
        """
        url_endpoint = "{0}/{1}/_mapping/*/field/*".format(self.es_url, self.stringify_datasets())
        response = requests.get(url_endpoint).json()

        return response

708
    def add_is_nested_to_fields(self, nested_fields, fields_and_types: List[Dict], field_name_key='full_path'):
709
        """
710
        Given a list of dictionaries where the keys are field names,
711
712
713
714
715
716
717
718
719
720
721
        adds a value that determines if that field is of the nested datatype.
        :param nested_fields:
        :param field_name_key: Key name that contains the field name.
        :param fields_and_types: List of dictionaries that contain an ES field names (including dot notation)
        :return:
        """
        new_list = []

        for field_dict in fields_and_types:
            for nested_field_name in nested_fields:
                if nested_field_name in field_dict[field_name_key]:
722
                    field_dict['is_nested'] = True
723
                else:
724
                    field_dict['is_nested'] = False
725
726
727
728
                new_list.append(field_dict)

        return new_list

unknown's avatar
unknown committed
729
    def get_nested_field_names(self, remove_duplicate_keys=False) -> list:
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
        """
        Traverses the doc_type's mapping schema to return
        a list with unique field names of fields that are of the nested datatype.
        Supports multiple indices.
        :return:
        """
        index_mapping = self.get_mapping_schema()
        nested_field_names = []

        for index_name, index_dict in index_mapping.items():
            for mapping_name, mapping_dict in index_dict['mappings'].items():
                for field_name, field_dict in mapping_dict['properties'].items():
                    if field_dict.get('type', None) == "nested":
                        nested_field_names.append(field_name)

        without_duplicates = list(set(nested_field_names))
        return without_duplicates

748
    def get_filtered_field_mappings(self, es_field_mappings: dict) -> dict:
749
750
751
752
753
        """
        Given the results of the _mapping endpoint for fields,
        removes all keys that contains built-in ES values.
        :return:
        """
754
755
        elastic_keys = ["_seq_no", "_mapping", "_id", "_version", "_uid", "_type", "_source", "_field_names", "_all",
                        "_index", "_parent", "_routing"]
756
757
758
759
760
761
762
763
764
765
        filtered_dict = dict()

        for index_name, index_dict in es_field_mappings.items():
            for mapping_name, mappings_dict in index_dict['mappings'].items():
                for field_name, field_dict in mappings_dict.items():
                    if field_name not in elastic_keys:
                        filtered_dict[field_name] = field_dict

        return filtered_dict

766
    def split_nested_fields(self, fields_and_types: List[Dict]):
Marko Kollo's avatar
Marko Kollo committed
767
768
769
770
771
772
773
774
775
776
777
        nested_fields = []
        normal_fields = []

        for field in fields_and_types:
            if field.get('is_nested', None) is True:
                nested_fields.append(field)
            elif field.get('is_nested', None) is False:
                normal_fields.append(field)

        return normal_fields, nested_fields

778

779
780
781
782
783
784
    @staticmethod
    def is_field_text_field(field_name, index_name):
        text_types = ["text", "keyword"]
        es = Elasticsearch(es_url)
        mapping = es.indices.get_field_mapping(fields=[field_name], index=[index_name])
        field_type = mapping[index_name]["mappings"][index_name][field_name]["mapping"][field_name]["type"]
785
        return True if field_type in text_types else False