From a1ed1abbdea5cf05aa602f27aa5c700a08c88167 Mon Sep 17 00:00:00 2001 From: "Daniel (dB.) Doubrovkine" Date: Fri, 17 Nov 2023 13:18:42 -0500 Subject: [PATCH] Fix: TypeError on calling parallel_bulk. (#601) * Fix: TypeError on calling parallel_bulk. Signed-off-by: dblock * Added a sample that uses a bulk function generator. Signed-off-by: dblock --------- Signed-off-by: dblock Signed-off-by: roma2023 --- CHANGELOG.md | 1 + guides/bulk.md | 58 +++++++++++++++++++ opensearchpy/helpers/actions.py | 11 +++- samples/bulk/bulk-helpers.py | 52 ++++++++++++++++- .../test_helpers/test_actions.py | 53 ++++++++++++++++- 5 files changed, 172 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bea060e1..794df51c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Deprecated ### Removed ### Fixed +- Fix `TypeError` on `parallel_bulk` ([#601](https://github.com/opensearch-project/opensearch-py/pull/601)) ### Security ## [2.4.1] diff --git a/guides/bulk.md b/guides/bulk.md index ef6b8c5d..251be4f8 100644 --- a/guides/bulk.md +++ b/guides/bulk.md @@ -1,6 +1,8 @@ - [Bulk Indexing](#bulk-indexing) - [Line-Delimited JSON](#line-delimited-json) - [Bulk Helper](#bulk-helper) + - [Parallel Bulk](#parallel-bulk) + - [Data Generator](#data-generator) # Bulk Indexing @@ -46,6 +48,8 @@ data = [ response = client.bulk(data) if response["errors"]: print(f"There were errors!") + for item in response["items"]: + print(f"{item['index']['status']}: {item['index']['error']['type']}") else: print(f"Bulk-inserted {len(rc['items'])} items.") ``` @@ -69,3 +73,57 @@ response = helpers.bulk(client, docs, max_retries=3) print(response) ``` +## Parallel Bulk + +Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc. + +```python +succeeded = [] +failed = [] +for success, item in helpers.parallel_bulk(client, + actions=data, + chunk_size=10, + raise_on_error=False, + raise_on_exception=False, + max_chunk_bytes=20 * 1024 * 1024, + request_timeout=60): + + if success: + succeeded.append(item) + else: + failed.append(item) + +if len(failed) > 0: + print(f"There were {len(failed)} errors:") + for item in failed: + print(f"{item['index']['error']}: {item['index']['exception']}") + +if len(succeeded) > 0: + print(f"Bulk-inserted {len(succeeded)} items.") +``` + +## Data Generator + +Use a data generator function with bulk helpers instead of building arrays. + +```python +def _generate_data(): + for i in range(100): + yield {"_index": index_name, "_id": i, "value": i} + +succeeded = [] +failed = [] +for success, item in helpers.parallel_bulk(client, actions=_generate_data()): + if success: + succeeded.append(item) + else: + failed.append(item) + +if len(failed) > 0: + print(f"There were {len(failed)} errors:") + for item in failed: + print(item["index"]["error"]) + +if len(succeeded) > 0: + print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).") +``` \ No newline at end of file diff --git a/opensearchpy/helpers/actions.py b/opensearchpy/helpers/actions.py index 7f8ced35..960d5a1c 100644 --- a/opensearchpy/helpers/actions.py +++ b/opensearchpy/helpers/actions.py @@ -442,6 +442,8 @@ def parallel_bulk( max_chunk_bytes: int = 100 * 1024 * 1024, queue_size: int = 4, expand_action_callback: Any = expand_action, + raise_on_exception: bool = True, + raise_on_error: bool = True, ignore_status: Any = (), *args: Any, **kwargs: Any @@ -485,7 +487,14 @@ def _setup_queues(self) -> None: for result in pool.imap( lambda bulk_chunk: list( _process_bulk_chunk( - client, bulk_chunk[1], bulk_chunk[0], ignore_status, *args, **kwargs + client, + bulk_chunk[1], + bulk_chunk[0], + raise_on_exception, + raise_on_error, + ignore_status, + *args, + **kwargs ) ), _chunk_actions( diff --git a/samples/bulk/bulk-helpers.py b/samples/bulk/bulk-helpers.py index 3dc165c8..678b2c09 100755 --- a/samples/bulk/bulk-helpers.py +++ b/samples/bulk/bulk-helpers.py @@ -12,6 +12,7 @@ import os +from typing import Any from opensearchpy import OpenSearch, helpers @@ -49,8 +50,57 @@ for i in range(100): data.append({"_index": index_name, "_id": i, "value": i}) +# serialized bulk raising an exception on error rc = helpers.bulk(client, data) -print(f"Bulk-inserted {rc[0]} items.") +print(f"Bulk-inserted {rc[0]} items (bulk).") + +# parallel bulk with explicit error checking +succeeded = [] +failed = [] +for success, item in helpers.parallel_bulk( + client, + actions=data, + chunk_size=10, + raise_on_error=False, + raise_on_exception=False, + max_chunk_bytes=20 * 1024 * 1024, + request_timeout=60, +): + if success: + succeeded.append(item) + else: + failed.append(item) + +if len(failed) > 0: + print(f"There were {len(failed)} errors:") + for item in failed: + print(item["index"]["error"]) + +if len(succeeded) > 0: + print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).") + + +# streaming bulk with a data generator +def _generate_data() -> Any: + for i in range(100): + yield {"_index": index_name, "_id": i, "value": i} + + +succeeded = [] +failed = [] +for success, item in helpers.streaming_bulk(client, actions=_generate_data()): + if success: + succeeded.append(item) + else: + failed.append(item) + +if len(failed) > 0: + print(f"There were {len(failed)} errors:") + for item in failed: + print(item["index"]["error"]) + +if len(succeeded) > 0: + print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).") # delete index client.indices.delete(index=index_name) diff --git a/test_opensearchpy/test_helpers/test_actions.py b/test_opensearchpy/test_helpers/test_actions.py index 739e8647..e44dbc98 100644 --- a/test_opensearchpy/test_helpers/test_actions.py +++ b/test_opensearchpy/test_helpers/test_actions.py @@ -67,11 +67,62 @@ def test_all_chunks_sent(self, _process_bulk_chunk: Any) -> None: self.assertEqual(50, mock_process_bulk_chunk.call_count) # type: ignore + @mock.patch("opensearchpy.OpenSearch.bulk") + def test_with_all_options(self, _bulk: Any) -> None: + actions = ({"x": i} for i in range(100)) + list( + helpers.parallel_bulk( + OpenSearch(), + actions=actions, + chunk_size=2, + raise_on_error=False, + raise_on_exception=False, + max_chunk_bytes=20 * 1024 * 1024, + request_timeout=160, + ignore_status=(123), + ) + ) + + self.assertEqual(50, _bulk.call_count) + _bulk.assert_called_with( + '{"index":{}}\n{"x":98}\n{"index":{}}\n{"x":99}\n', request_timeout=160 + ) + + @mock.patch("opensearchpy.helpers.actions._process_bulk_chunk") + def test_process_bulk_chunk_with_all_options( + self, _process_bulk_chunk: Any + ) -> None: + actions = ({"x": i} for i in range(100)) + client = OpenSearch() + list( + helpers.parallel_bulk( + client, + actions=actions, + chunk_size=2, + raise_on_error=True, + raise_on_exception=True, + max_chunk_bytes=20 * 1024 * 1024, + request_timeout=160, + ignore_status=(123), + ) + ) + + self.assertEqual(50, _process_bulk_chunk.call_count) + _process_bulk_chunk.assert_called_with( + client, + ['{"index":{}}', '{"x":98}', '{"index":{}}', '{"x":99}'], + [({"index": {}}, {"x": 98}), ({"index": {}}, {"x": 99})], + True, + True, + 123, + request_timeout=160, + ) + @pytest.mark.skip # type: ignore @mock.patch( "opensearchpy.helpers.actions._process_bulk_chunk", # make sure we spend some time in the thread - side_effect=lambda *a: [ + side_effect=lambda *args, **kwargs: [ (True, time.sleep(0.001) or threading.current_thread().ident) # type: ignore ], )