Skip to content

Commit 9d38495

Browse files
authored
Add optimistic concurrency and update+_source options to bulk
1 parent 852d773 commit 9d38495

File tree

3 files changed

+124
-3
lines changed

3 files changed

+124
-3
lines changed

elasticsearch/compat.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
map = map
3333
from queue import Queue
3434

35+
try:
36+
from collections.abs import Mapping
37+
except ImportError:
38+
from collections import Mapping
39+
40+
3541
__all__ = [
3642
"string_types",
3743
"quote_plus",
@@ -41,4 +47,5 @@
4147
"urlparse",
4248
"map",
4349
"Queue",
50+
"Mapping",
4451
]

elasticsearch/helpers/actions.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import time
2020

2121
from ..exceptions import TransportError
22-
from ..compat import map, string_types, Queue
22+
from ..compat import map, string_types, Queue, Mapping
2323

2424
from .errors import ScanError, BulkIndexError
2525

@@ -43,9 +43,22 @@ def expand_action(data):
4343
data = data.copy()
4444
op_type = data.pop("_op_type", "index")
4545
action = {op_type: {}}
46+
47+
# If '_source' is a dict use it for source
48+
# otherwise if op_type == 'update' then
49+
# '_source' should be in the metadata.
50+
if (
51+
op_type == "update"
52+
and "_source" in data
53+
and not isinstance(data["_source"], Mapping)
54+
):
55+
action[op_type]["_source"] = data.pop("_source")
56+
4657
for key in (
4758
"_id",
4859
"_index",
60+
"_if_seq_no",
61+
"_if_primary_term",
4962
"_parent",
5063
"_percolate",
5164
"_retry_on_conflict",
@@ -54,6 +67,8 @@ def expand_action(data):
5467
"_type",
5568
"_version",
5669
"_version_type",
70+
"if_seq_no",
71+
"if_primary_term",
5772
"parent",
5873
"pipeline",
5974
"retry_on_conflict",
@@ -62,13 +77,15 @@ def expand_action(data):
6277
"version_type",
6378
):
6479
if key in data:
65-
if key in [
80+
if key in {
81+
"_if_seq_no",
82+
"_if_primary_term",
6683
"_parent",
6784
"_retry_on_conflict",
6885
"_routing",
6986
"_version",
7087
"_version_type",
71-
]:
88+
}:
7289
action[op_type][key[1:]] = data.pop(key)
7390
else:
7491
action[op_type][key] = data.pop(key)

test_elasticsearch/test_helpers.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,103 @@ class TestChunkActions(TestCase):
7676
def setup_method(self, _):
7777
self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)]
7878

79+
def test_expand_action(self):
80+
self.assertEqual(helpers.expand_action({}), ({"index": {}}, {}))
81+
self.assertEqual(
82+
helpers.expand_action({"key": "val"}), ({"index": {}}, {"key": "val"})
83+
)
84+
85+
def test_expand_action_actions(self):
86+
self.assertEqual(
87+
helpers.expand_action(
88+
{"_op_type": "delete", "_id": "id", "_index": "index"}
89+
),
90+
({"delete": {"_id": "id", "_index": "index"}}, None),
91+
)
92+
self.assertEqual(
93+
helpers.expand_action(
94+
{"_op_type": "update", "_id": "id", "_index": "index", "key": "val"}
95+
),
96+
({"update": {"_id": "id", "_index": "index"}}, {"key": "val"}),
97+
)
98+
self.assertEqual(
99+
helpers.expand_action(
100+
{"_op_type": "create", "_id": "id", "_index": "index", "key": "val"}
101+
),
102+
({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}),
103+
)
104+
self.assertEqual(
105+
helpers.expand_action(
106+
{
107+
"_op_type": "create",
108+
"_id": "id",
109+
"_index": "index",
110+
"_source": {"key": "val"},
111+
}
112+
),
113+
({"create": {"_id": "id", "_index": "index"}}, {"key": "val"}),
114+
)
115+
116+
def test_expand_action_options(self):
117+
for option in (
118+
"_id",
119+
"_index",
120+
"_percolate",
121+
"_timestamp",
122+
"_type",
123+
"if_seq_no",
124+
"if_primary_term",
125+
"parent",
126+
"pipeline",
127+
"retry_on_conflict",
128+
"routing",
129+
"version",
130+
"version_type",
131+
("_parent", "parent"),
132+
("_retry_on_conflict", "retry_on_conflict"),
133+
("_routing", "routing"),
134+
("_version", "version"),
135+
("_version_type", "version_type"),
136+
("_if_seq_no", "if_seq_no"),
137+
("_if_primary_term", "if_primary_term"),
138+
):
139+
if isinstance(option, str):
140+
action_option = option
141+
else:
142+
option, action_option = option
143+
self.assertEqual(
144+
helpers.expand_action({"key": "val", option: 0}),
145+
({"index": {action_option: 0}}, {"key": "val"}),
146+
)
147+
148+
def test__source_metadata_or_source(self):
149+
self.assertEqual(
150+
helpers.expand_action({"_source": {"key": "val"}}),
151+
({"index": {}}, {"key": "val"}),
152+
)
153+
154+
self.assertEqual(
155+
helpers.expand_action(
156+
{"_source": ["key"], "key": "val", "_op_type": "update"}
157+
),
158+
({"update": {"_source": ["key"]}}, {"key": "val"}),
159+
)
160+
161+
self.assertEqual(
162+
helpers.expand_action(
163+
{"_source": True, "key": "val", "_op_type": "update"}
164+
),
165+
({"update": {"_source": True}}, {"key": "val"}),
166+
)
167+
168+
# This case is only to ensure backwards compatibility with old functionality.
169+
self.assertEqual(
170+
helpers.expand_action(
171+
{"_source": {"key2": "val2"}, "key": "val", "_op_type": "update"}
172+
),
173+
({"update": {}}, {"key2": "val2"}),
174+
)
175+
79176
def test_chunks_are_chopped_by_byte_size(self):
80177
self.assertEqual(
81178
100,

0 commit comments

Comments
 (0)