Skip to content

Commit

Permalink
Add keys flag (#68)
Browse files Browse the repository at this point in the history
* Add keys flag
  • Loading branch information
moshe authored Jun 29, 2019
1 parent 853ae3c commit 9b4699e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
1 change: 1 addition & 0 deletions elasticsearch_loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def log(sevirity, msg):
@click.option('--index-settings-file', type=click.File('rb'), help='Specify path to json file containing index mapping and settings, creates index if missing')
@click.option('--timeout', type=float, help='Specify request timeout in seconds for Elasticsearch client', default=10)
@click.option('--encoding', type=str, help='Specify content encoding for input files', default='utf-8')
@click.option('--keys', type=str, help='Comma separated keys to pick from each document', default='', callback=lambda c, p, v: [x for x in v.split(',') if x])
@click.pass_context
def cli(ctx, **opts):
ctx.obj = opts
Expand Down
8 changes: 6 additions & 2 deletions elasticsearch_loader/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ def grouper(iterable, n, fillvalue=None):

def bulk_builder(bulk, config):
for item in filter(None, bulk):
source = item
if config['keys']:
source = {x: y for x, y in item.items() if x in config['keys']}

body = {'_index': config['index'],
'_type': config['type'],
'_source': item}
'_source': source}

if config['id_field']:
body['_id'] = item[config['id_field']]
Expand All @@ -28,7 +32,7 @@ def bulk_builder(bulk, config):
if config['update']:
# default _op_type is 'index', which will overwrites existing doc
body['_op_type'] = 'update'
body['doc'] = item
body['doc'] = source
del body['_source']

yield body
Expand Down
3 changes: 2 additions & 1 deletion test/test_json.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mock
from click.testing import CliRunner

from elasticsearch_loader import cli
import mock


def invoke(*args, **kwargs):
Expand Down

0 comments on commit 9b4699e

Please sign in to comment.