Skip to content

Commit

Permalink
Added bulk samples and explained error handling. (#448)
Browse files Browse the repository at this point in the history
* Added bulk samples and explained error handling.

Signed-off-by: dblock <dblock@amazon.com>

* The client can serialize an array for you.

Signed-off-by: dblock <dblock@amazon.com>

---------

Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock authored Jul 26, 2023
1 parent f54973e commit 54a517a
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 15 deletions.
55 changes: 40 additions & 15 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
- [Bulk Indexing](#bulk-indexing)
- [Use a Helper](#use-a-helper)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)

# Bulk Indexing

The [Bulk API](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/) lets you add, update, or delete multiple documents in a single request.

## Line-Delimited JSON

The `bulk` API accepts line-delimited JSON. This method requires the caller to evaluate the return value and parse errors in the case of a failure or partial success. See [samples/bulk/bulk-ld.py](../samples/bulk/bulk-ld.py) for a working sample.

```python
from opensearchpy import OpenSearch

Expand All @@ -20,27 +25,47 @@ docs = '''
'''

response = client.bulk(docs)
print(response)
if response["errors"]:
print(f"There were errors!")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```

## Use a Helper
The client can also serialize an array of data into bulk-delimited JSON for you. See [samples/bulk/bulk-array.py](../samples/bulk/bulk-array.py) for a working sample.

```python
data = [
{ "index": { "_index": "index-2022-06-08", "_id": 1 }}
{ "name": "foo"}
{ "index": { "_index": "index-2022-06-09", "_id": 2 }}
{ "name": "bar"}
{ "index": { "_index": "index-2022-06-10", "_id": 3 }}
{ "name": "baz"}
]

response = client.bulk(data)
if response["errors"]:
print(f"There were errors!")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```

## Bulk Helper

A helper can generate the line-delimited JSON for you from a Python array that contains `_index` and `_id` fields, and parse errors. The `helpers.bulk` implementation will raise `BulkIndexError` if any error occurs. This may indicate a partially successful result. See [samples/bulk/bulk-helpers.py](../samples/bulk/bulk-helpers.py) for a working sample.

```python
from opensearchpy import OpenSearch, helpers

client = OpenSearch(...)

docs = []
def generate_data():
mywords = ['foo', 'bar', 'baz']
for index, word in enumerate(mywords):
docs.append({
"_index": "mywords",
"word": word,
"_id": index
})
return docs

response = helpers.bulk(client, generate_data(), max_retries=3)
docs = [
{ "_index": "words", "_id": "word1", word: "foo" },
{ "_index": "words", "_id": "word2", word: "bar" },
{ "_index": "words", "_id": "word3", word: "baz" },
]

response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

64 changes: 64 additions & 0 deletions samples/bulk/bulk-array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = []
for i in range(100):
data.append({ "index": {"_index": index_name, "_id": i }})
data.append({ "value": i })

rc = client.bulk(data)
if rc["errors"]:
print(f"There were errors:")
for item in rc["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")

# delete index
client.indices.delete(index=index_name)

58 changes: 58 additions & 0 deletions samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch, helpers

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = []
for i in range(100):
data.append({ "_index": index_name, "_id": i, "value": i })

rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")

# delete index
client.indices.delete(index=index_name)

64 changes: 64 additions & 0 deletions samples/bulk/bulk-ld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = ''
for i in range(100):
data += json.dumps({ "index": {"_index": index_name, "_id": i }}) + "\n"
data += json.dumps({ "value": i }) + "\n"

rc = client.bulk(data)
if rc["errors"]:
print(f"There were errors:")
for item in rc["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")

# delete index
client.indices.delete(index=index_name)

0 comments on commit 54a517a

Please sign in to comment.