Skip to content

Commit

Permalink
Added bulk method.
Browse files Browse the repository at this point in the history
  • Loading branch information
Charl van Niekerk committed Feb 6, 2020
1 parent 4df0f86 commit c1408a1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
43 changes: 43 additions & 0 deletions elasticmock/fake_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,49 @@ def index(self, index, body, doc_type='_doc', id=None, params=None):
'_index': index
}

@query_params('consistency', 'op_type', 'parent', 'refresh', 'replication',
'routing', 'timeout', 'timestamp', 'ttl', 'version', 'version_type')
def bulk(self, body, params=None):
id = None
version = 1
items = []

for line in body.splitlines():
if len(line.strip()) > 0:
line = json.loads(line)

if 'index' in line:
index = line['index']['_index']
doc_type = line['index']['_type']

if index not in self.__documents_dict:
self.__documents_dict[index] = list()
else:
if id is None:
id = get_random_id()

self.__documents_dict[index].append({
'_type': doc_type,
'_id': id,
'_source': line,
'_index': index,
'_version': version
})

items.append({'index': {
'_type': doc_type,
'_id': id,
'_index': index,
'_version': version,
'result': 'created',
'status': 201
}})

return {
'errors': False,
'items': items
}

@query_params('parent', 'preference', 'realtime', 'refresh', 'routing')
def exists(self, index, doc_type, id, params=None):
result = False
Expand Down
27 changes: 27 additions & 0 deletions tests/test_elasticmock.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

import json
import unittest
import elasticsearch
from elasticsearch.exceptions import NotFoundError
Expand Down Expand Up @@ -27,6 +28,32 @@ def test_should_index_document(self):
self.assertEqual(1, data.get('_version'))
self.assertEqual(self.index_name, data.get('_index'))

def test_should_bulk_index_documents(self):
action = {'index': {'_index': self.index_name, '_type': self.doc_type}}
action_json = json.dumps(action)
body_json = json.dumps(self.body)
num_of_documents = 10

lines = []
for count in range(0, num_of_documents):
lines.append(action_json)
lines.append(body_json)
body = '\n'.join(lines)

data = self.es.bulk(body=body)
items = data.get('items')

self.assertFalse(data.get('errors'))
self.assertEqual(num_of_documents, len(items))

for item in items:
index = item.get('index')

self.assertEqual(self.doc_type, index.get('_type'))
self.assertEqual(self.index_name, index.get('_index'))
self.assertEqual('created', index.get('result'))
self.assertEqual(201, index.get('status'))

def test_should_index_document_without_doc_type(self):
data = self.es.index(index=self.index_name, body=self.body)

Expand Down

0 comments on commit c1408a1

Please sign in to comment.