2
\$\begingroup\$

I tried to implement the idea given in the elasticsearch doc about reindexation albeit I think they must code it inside ElasticSearch and expose it as an API.

#!/usr/bin/env python

from __future__ import division
from argparse import ArgumentParser, FileType
import json
import requests
from time import sleep


class ElasticSearch():
    def __init__(self, url):
        self.url = url

    def request(self, method, path, data=None):
        return (requests.request(
                method, 'http://%s/%s' % (self.url, path),
                data=data,
                headers={'Content-type': 'application/json'}).json())

    def post(self, path, data):
        return self.request('post', path, data)

    def get(self, path, data=None):
        return self.request('get', path, data)

    def scan_and_scroll(self, index):
        response = self.get('%s/_search?search_type=scan&scroll=1m' % index,
                            data=json.dumps({"query": {"match_all": {}},
                                             "size": 100}))
        while True:
            response = self.get('_search/scroll?scroll=1m',
                                data=response['_scroll_id'])
            if len(response['hits']['hits']) == 0:
                return
            yield response['hits']['hits']

    def set_mapping(self, index, mappings):
        return self.post(index, data=json.dumps(mappings))

    def count(self, index):
        response = self.get('%s/_search' % index)
        return response['hits']['total'] if 'hits' in response else 0

    def bulk_insert(self, index, bulk):
        return self.post('_bulk',
                         data=''.join(
                         json.dumps({'create': {'_index': index,
                                                '_type': line['_type']}}) +
                         "\n" +
                         json.dumps(line['_source']) + "\n" for line in bulk))

    def drop(self, index):
        return self.request('delete', index)

    def alias(self, index, to):
        return self.request('put', '%s/_alias/%s' % (index, to))


def change_mapping_and_reindex(elasticsearch, mapping_file, index):
    es = ElasticSearch(elasticsearch)

    mapping_text = mapping_file.read()
    temporary_index = None
    for i in range(10):
        try_temporary_index = index + '-tmp-' + str(i)
        print "Setting mapping to %s" % try_temporary_index
        response = es.set_mapping(try_temporary_index,
                                  json.loads(mapping_text))
        if 'acknowledged' in response and response['acknowledged']:
            temporary_index = try_temporary_index
            break
    if temporary_index is None:
        print "Can't find a temporary index to work with."
        return False

    old_index_count = es.count(index)
    new_index_count = es.count(temporary_index)

    for bulk in es.scan_and_scroll(index):
        es.bulk_insert(temporary_index, bulk)
        new_index_count = es.count(temporary_index)
        percent = 100 * new_index_count / old_index_count
        print ("\r%.2f%%" + 10 * " ") % percent,
    print "\nDone"

    for i in range(100):
        new_index_count = es.count(temporary_index)
        if new_index_count == old_index_count:
            print "OK, same number of raws in both index."
            break
        print ("Not the same number of raws in old and new... "
               "waiting a bit..."
               "(old=%d, new=%d)" % (old_index_count, new_index_count))
        if i > 10:
            print ("Oh fsck, not the same number of raws in old and new... "
                   "aborting."
                   "(old=%d, new=%d)" % (old_index_count, new_index_count))
            return
        sleep(1)

    print "Deleting %s" % index
    es.drop(index)
    print "Aliasing %s to %s" % (temporary_index, index)
    es.alias(temporary_index, index)


if __name__ == '__main__':
    parser = ArgumentParser(
        description="Remap and reindex the given index, but only if you stoped"
        "writing to it (will fail if you're writing")
    parser.add_argument('--index', help='index to remap')
    parser.add_argument('--elasticsearch', help='ES host')
    parser.add_argument('--mapping',
                        help='Mapping file, starts with {"mappings"...',
                        type=FileType('r'))
    args = parser.parse_args()
    change_mapping_and_reindex(args.elasticsearch, args.mapping, args.index)
\$\endgroup\$
2
  • \$\begingroup\$ As you don't say anything specific about what you are looking for, I assume you just want a general review about all of the aspects? \$\endgroup\$ Commented Jul 14, 2014 at 9:34
  • \$\begingroup\$ Whatever comes first, it's like this I see code review. \$\endgroup\$ Commented Jul 14, 2014 at 13:06

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.