Skip to content

Commit add015f

Browse files
authored
Add a bulk ingest example
1 parent 0296999 commit add015f

File tree

6 files changed

+172
-5
lines changed

6 files changed

+172
-5
lines changed

docs/helpers.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ document is like ``{"word": "<myword>"}``.
9191
9292
9393
For a more complete and complex example please take a look at
94-
https://github.com/elastic/elasticsearch-py/blob/master/example/load.py#L76-L130
94+
https://github.com/elastic/elasticsearch-py/blob/master/examples/bulk-ingest
9595

9696
The :meth:`~elasticsearch.Elasticsearch.parallel_bulk` api is a wrapper around the :meth:`~elasticsearch.Elasticsearch.bulk` api to provide threading. :meth:`~elasticsearch.Elasticsearch.parallel_bulk` returns a generator which must be consumed to produce results.
9797

elasticsearch/helpers/test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def setUpClass(cls):
5050

5151
def tearDown(self):
5252
super(ElasticsearchTestCase, self).tearDown()
53-
self.client.indices.delete(index="*", ignore=404, expand_wildcards="open,closed,hidden")
53+
self.client.indices.delete(
54+
index="*", ignore=404, expand_wildcards="open,closed,hidden"
55+
)
5456
self.client.indices.delete_template(name="*", ignore=404)
5557

5658
@property

examples/bulk-ingest/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Bulk Ingest
2+
3+
A simple script that shows how to ingest a dataset from a file into Elasticsearch.
4+
The file that is used for this example is a `.csv` so each row is turned into a document.
5+
6+
To run this example install the dependencies with `pip`:
7+
8+
```console
9+
python -m pip install -r requirements.txt
10+
```
11+
12+
and then run the script with Python:
13+
14+
```console
15+
python bulk-ingest.py
16+
```
17+
18+
You should see the script downloading the dataset into `nyc-restaurants.csv`
19+
20+
Once all the data is loaded into Elasticsearch you can do queries on the dataset
21+
or create visualizations within Kibana.
22+
23+
```python
24+
import elasticsearch
25+
26+
client = elasticsearch.Elasticsearch()
27+
resp = client.search(
28+
index="nyc-restaurants",
29+
size=0,
30+
body={
31+
"aggs": {
32+
"borough": {
33+
"terms": {
34+
"field": "borough"
35+
},
36+
"aggs": {
37+
"grades": {
38+
"terms": {
39+
"field": "grade"
40+
}
41+
}
42+
}
43+
}
44+
}
45+
}
46+
)
47+
print(resp)
48+
```

examples/bulk-ingest/bulk-ingest.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/env python
2+
"""Script that downloads a public dataset and streams it to an Elasticsearch cluster"""
3+
4+
import csv
5+
from os.path import abspath, join, dirname, exists
6+
import tqdm
7+
import urllib3
8+
from elasticsearch import Elasticsearch
9+
from elasticsearch.helpers import streaming_bulk
10+
11+
12+
NYC_RESTAURANTS = (
13+
"https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD"
14+
)
15+
DATASET_PATH = join(dirname(abspath(__file__)), "nyc-restaurants.csv")
16+
CHUNK_SIZE = 16384
17+
18+
19+
def download_dataset():
20+
"""Downloads the public dataset if not locally downlaoded
21+
and returns the number of rows are in the .csv file.
22+
"""
23+
if not exists(DATASET_PATH):
24+
http = urllib3.PoolManager()
25+
resp = http.request("GET", NYC_RESTAURANTS, preload_content=False)
26+
27+
if resp.status != 200:
28+
raise RuntimeError("Could not download dataset")
29+
30+
with open(DATASET_PATH, mode="wb") as f:
31+
chunk = resp.read(CHUNK_SIZE)
32+
while chunk:
33+
f.write(chunk)
34+
chunk = resp.read(CHUNK_SIZE)
35+
36+
with open(DATASET_PATH) as f:
37+
return sum([1 for _ in f]) - 1
38+
39+
40+
def create_index(client):
41+
"""Creates an index in Elasticsearch if one isn't already there."""
42+
client.indices.create(
43+
index="nyc-restaurants",
44+
body={
45+
"settings": {"number_of_shards": 1},
46+
"mappings": {
47+
"properties": {
48+
"name": {"type": "text"},
49+
"borough": {"type": "keyword"},
50+
"cuisine": {"type": "keyword"},
51+
"grade": {"type": "keyword"},
52+
"location": {"type": "geo_point"},
53+
}
54+
},
55+
},
56+
ignore=400,
57+
)
58+
59+
60+
def generate_actions():
61+
"""Reads the file through csv.DictReader() and for each row
62+
yields a single document. This function is passed into the bulk()
63+
helper to create many documents in sequence.
64+
"""
65+
with open(DATASET_PATH, mode="r") as f:
66+
reader = csv.DictReader(f)
67+
68+
for row in reader:
69+
doc = {
70+
"_id": row["CAMIS"],
71+
"name": row["DBA"],
72+
"borough": row["BORO"],
73+
"cuisine": row["CUISINE DESCRIPTION"],
74+
"grade": row["GRADE"] or None,
75+
}
76+
77+
lat = row["Latitude"]
78+
lon = row["Longitude"]
79+
if lat not in ("", "0") and lon not in ("", "0"):
80+
doc["location"] = {"lat": float(lat), "lon": float(lon)}
81+
yield doc
82+
83+
84+
def main():
85+
print("Loading dataset...")
86+
number_of_docs = download_dataset()
87+
88+
client = Elasticsearch(
89+
# Add your cluster configuration here!
90+
)
91+
print("Creating an index...")
92+
create_index(client)
93+
94+
print("Indexing documents...")
95+
progress = tqdm.tqdm(unit="docs", total=number_of_docs)
96+
successes = 0
97+
for ok, action in streaming_bulk(
98+
client=client, index="nyc-restaurants", actions=generate_actions(),
99+
):
100+
progress.update(1)
101+
successes += ok
102+
print("Indexed %d/%d documents" % (successes, number_of_docs))
103+
104+
105+
if __name__ == "__main__":
106+
main()

examples/bulk-ingest/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
elasticsearch
2+
urllib3
3+
tqdm

tox.ini

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,30 @@ setenv =
77
commands =
88
python setup.py test
99

10-
[testenv:lint]
10+
[testenv:blacken]
1111
deps =
12-
flake8
1312
black
1413
commands =
1514
black --target-version=py27 \
1615
elasticsearch/ \
1716
test_elasticsearch/ \
17+
examples/ \
1818
setup.py
19+
20+
[testenv:lint]
21+
deps =
22+
flake8
23+
black
24+
commands =
1925
black --target-version=py27 --check \
2026
elasticsearch/ \
2127
test_elasticsearch/ \
28+
examples/ \
2229
setup.py
2330
flake8 \
2431
elasticsearch/ \
25-
test_elasticsearch/
32+
test_elasticsearch/ \
33+
examples/
2634

2735
[testenv:docs]
2836
deps =

0 commit comments

Comments
 (0)