Skip to content

Commit 2a5a641

Browse files
Bulk update (#53)
* WIP * Add bulk update utility and entrypoint * Address PR comments * Prefer joining arrays in string concatenation * Address PR comments
1 parent 745d4a7 commit 2a5a641

File tree

5 files changed

+502
-6
lines changed

5 files changed

+502
-6
lines changed

README.md

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ pip install git+https://github.com/RedisGraph/redisgraph-bulk-loader.git@master
2727
```
2828

2929
## Usage
30-
Pip installation exposes `redisgraph-bulk-insert` as a command to invoke this tool:
30+
Pip installation exposes `redisgraph-bulk-loader` as a command to invoke this tool:
3131
```
32-
redisgraph-bulk-insert GRAPHNAME [OPTIONS]
32+
redisgraph-bulk-loader GRAPHNAME [OPTIONS]
3333
```
3434

3535
Installation by cloning the repository allows the script to be invoked via Python like so:
@@ -63,7 +63,7 @@ The only required arguments are the name to give the newly-created graph (which
6363
The nodes and relationship flags should be specified once per input file.
6464

6565
```
66-
redisgraph-bulk-insert GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
66+
redisgraph-bulk-loader GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r example/KNOWS.csv -r example/VISITED.csv
6767
```
6868
The label (for nodes) or relationship type (for relationships) is derived from the base name of the input CSV file. In this example, we'll construct two sets of nodes, labeled `Person` and `Country`, and two types of relationships - `KNOWS` and `VISITED`.
6969

@@ -172,3 +172,36 @@ Inserting these CSVs with the command:
172172

173173
Will produce a graph named SocialGraph with 2 users, Jeffrey and Filipe. Jeffrey follows Filipe, and that relation has a reaction_count of 25. Filipe also follows Jeffrey, with a reaction_count of 10.
174174

175+
## Performing bulk updates
176+
Pip installation also exposes the command `redisgraph-bulk-update`:
177+
```
178+
redisgraph-bulk-update GRAPHNAME [OPTIONS]
179+
```
180+
181+
Installation by cloning the repository allows the bulk updater to be invoked via Python like so:
182+
```
183+
python3 redisgraph_bulk_loader/bulk_update.py GRAPHNAME [OPTIONS]
184+
```
185+
186+
| Flags | Extended flags | Parameter |
187+
|:-----:|--------------------------|:----------------------------------------------------------:|
188+
| -h | --host TEXT | Redis server host (default: 127.0.0.1) |
189+
| -p | --port INTEGER | Redis server port (default: 6379) |
190+
| -a | --password TEXT | Redis server password (default: none) |
191+
| -u | --unix-socket-path TEXT | Redis unix socket path (default: none) |
192+
| -q | --query TEXT | Query to run on server |
193+
| -v | --variable-name TEXT | Variable name for row array in queries (default: row) |
194+
| -c | --csv TEXT | Path to CSV input file |
195+
| -o | --separator TEXT | Field token separator in CSV file |
196+
| -n | --no-header | If set, the CSV file has no header |
197+
| -t | --max-token-size INTEGER | Max size of each token in megabytes (default 500, max 512) |
198+
199+
The bulk updater allows a CSV file to be read in batches and committed to RedisGraph according to the provided query.
200+
201+
For example, given the CSV files described in [Input Schema CSV examples](#input-schema-csv-examples), the bulk loader could create the same nodes and relationships with the commands:
202+
```
203+
redisgraph-bulk-update SocialGraph --csv User.csv --query "MERGE (:User {id: row[0], name: row[1], rank: row[2]})"
204+
redisgraph-bulk-update SocialGraph --csv FOLLOWS.csv --query "MATCH (start {id: row[0]}), (end {id: row[1]}) MERGE (start)-[f:FOLLOWS]->(end) SET f.reaction_count = row[2]"
205+
```
206+
207+
When using the bulk updater, it is essential to sanitize CSV inputs beforehand, as RedisGraph *will* commit changes to the graph incrementally. As such, malformed inputs may leave the graph in a partially-updated state.

redisgraph_bulk_loader/bulk_update.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import sys
2+
import csv
3+
import redis
4+
import click
5+
from redisgraph import Graph
6+
from timeit import default_timer as timer
7+
8+
9+
def utf8len(s):
10+
return len(s.encode('utf-8'))
11+
12+
13+
# Count number of rows in file.
14+
def count_entities(filename):
15+
entities_count = 0
16+
with open(filename, 'rt') as f:
17+
entities_count = sum(1 for line in f)
18+
return entities_count
19+
20+
21+
class BulkUpdate:
22+
"""Handler class for emitting bulk update commands"""
23+
def __init__(self, graph_name, max_token_size, separator, no_header, filename, query, variable_name, client):
24+
self.separator = separator
25+
self.no_header = no_header
26+
self.query = " ".join(["UNWIND $rows AS", variable_name, query])
27+
self.buffer_size = 0
28+
self.max_token_size = max_token_size * 1024 * 1024 - utf8len(self.query)
29+
self.filename = filename
30+
self.graph_name = graph_name
31+
self.graph = Graph(graph_name, client)
32+
self.statistics = {}
33+
34+
def update_statistics(self, result):
35+
for key, new_val in result.statistics.items():
36+
try:
37+
val = self.statistics[key]
38+
except KeyError:
39+
val = 0
40+
val += new_val
41+
self.statistics[key] = val
42+
43+
def emit_buffer(self, rows):
44+
command = " ".join([rows, self.query])
45+
result = self.graph.query(command)
46+
self.update_statistics(result)
47+
48+
def quote_string(self, cell):
49+
cell = cell.strip()
50+
# Quote-interpolate cell if it is an unquoted string.
51+
try:
52+
float(cell) # Check for numeric
53+
except ValueError:
54+
if ((cell.lower() != 'false' and cell.lower() != 'true') and # Check for boolean
55+
(cell[0] != '[' and cell.lower != ']') and # Check for array
56+
(cell[0] != "\"" and cell[-1] != "\"") and # Check for double-quoted string
57+
(cell[0] != "\'" and cell[-1] != "\'")): # Check for single-quoted string
58+
cell = "".join(["\"", cell, "\""])
59+
return cell
60+
61+
# Raise an exception if the query triggers a compile-time error
62+
def validate_query(self):
63+
command = " ".join(["CYPHER rows=[]", self.query])
64+
# The plan call will raise an error if the query is malformed or invalid.
65+
self.graph.execution_plan(command)
66+
67+
def process_update_csv(self):
68+
entity_count = count_entities(self.filename)
69+
70+
with open(self.filename, 'rt') as f:
71+
if self.no_header is False:
72+
next(f) # skip header
73+
74+
reader = csv.reader(f, delimiter=self.separator, skipinitialspace=True, quoting=csv.QUOTE_NONE, escapechar='\\')
75+
76+
rows_strs = []
77+
with click.progressbar(reader, length=entity_count, label=self.graph_name) as reader:
78+
for row in reader:
79+
# Prepare the string representation of the current row.
80+
row = ",".join([self.quote_string(cell) for cell in row])
81+
next_line = "".join(["[", row.strip(), "]"])
82+
83+
# Emit buffer now if the max token size would be exceeded by this addition.
84+
added_size = utf8len(next_line) + 1 # Add one to compensate for the added comma.
85+
if self.buffer_size + added_size > self.max_token_size:
86+
# Concatenate all rows into a valid parameter set
87+
buf = "".join(["CYPHER rows=[", ",".join(rows_strs), "]"])
88+
self.emit_buffer(buf)
89+
rows_strs = []
90+
self.buffer_size = 0
91+
92+
# Concatenate the string into the rows string representation.
93+
rows_strs.append(next_line)
94+
self.buffer_size += added_size
95+
# Concatenate all rows into a valid parameter set
96+
buf = "".join(["CYPHER rows=[", ",".join(rows_strs), "]"])
97+
self.emit_buffer(buf)
98+
99+
100+
################################################################################
101+
# Bulk updater
102+
################################################################################
103+
# Command-line arguments
104+
@click.command()
105+
@click.argument('graph')
106+
# Redis server connection settings
107+
@click.option('--host', '-h', default='127.0.0.1', help='Redis server host')
108+
@click.option('--port', '-p', default=6379, help='Redis server port')
109+
@click.option('--password', '-a', default=None, help='Redis server password')
110+
@click.option('--unix-socket-path', '-u', default=None, help='Redis server unix socket path')
111+
# Cypher query options
112+
@click.option('--query', '-q', help='Query to run on server')
113+
@click.option('--variable-name', '-v', default='row', help='Variable name for row array in queries (default: row)')
114+
# CSV file options
115+
@click.option('--csv', '-c', help='Path to CSV input file')
116+
@click.option('--separator', '-o', default=',', help='Field token separator in CSV file')
117+
@click.option('--no-header', '-n', default=False, is_flag=True, help='If set, the CSV file has no header')
118+
# Buffer size restrictions
119+
@click.option('--max-token-size', '-t', default=500, help='Max size of each token in megabytes (default 500, max 512)')
120+
def bulk_update(graph, host, port, password, unix_socket_path, query, variable_name, csv, separator, no_header, max_token_size):
121+
if sys.version_info[0] < 3:
122+
raise Exception("Python 3 is required for the RedisGraph bulk updater.")
123+
124+
start_time = timer()
125+
126+
# Attempt to connect to Redis server
127+
try:
128+
if unix_socket_path is not None:
129+
client = redis.StrictRedis(unix_socket_path=unix_socket_path, password=password, decode_responses=True)
130+
else:
131+
client = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
132+
except redis.exceptions.ConnectionError as e:
133+
print("Could not connect to Redis server.")
134+
raise e
135+
136+
# Attempt to verify that RedisGraph module is loaded
137+
try:
138+
module_list = client.execute_command("MODULE LIST")
139+
if not any('graph' in module_description for module_description in module_list):
140+
print("RedisGraph module not loaded on connected server.")
141+
sys.exit(1)
142+
except redis.exceptions.ResponseError:
143+
# Ignore check if the connected server does not support the "MODULE LIST" command
144+
pass
145+
146+
updater = BulkUpdate(graph, max_token_size, separator, no_header, csv, query, variable_name, client)
147+
updater.validate_query()
148+
updater.process_update_csv()
149+
150+
end_time = timer()
151+
152+
for key, value in updater.statistics.items():
153+
print(key + ": " + repr(value))
154+
print("Update of graph '%s' complete in %f seconds" % (graph, end_time - start_time))
155+
156+
157+
if __name__ == '__main__':
158+
bulk_update()

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ def read_all(f):
3434
entry_points='''
3535
[console_scripts]
3636
redisgraph-bulk-loader=redisgraph_bulk_loader.bulk_insert:bulk_insert
37+
redisgraph-bulk-update=redisgraph_bulk_loader.bulk_update:bulk_update
3738
'''
3839
)

test/test_bulk_loader.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,6 @@ def test17_ensure_index_is_created(self):
682682
self.assertIn('2 nodes created', res.output)
683683
self.assertIn('Indices created: 1', res.output)
684684

685-
graph = Graph(graphname, self.redis_con)
686685
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
687686
res = r.execute_command("GRAPH.EXPLAIN", graphname, 'MATCH (p:Person) WHERE p.age > 16 RETURN p')
688687
self.assertIn(' Index Scan | (p:Person)', res)
@@ -710,12 +709,11 @@ def test18_ensure_full_text_index_is_created(self):
710709

711710
graph = Graph(graphname, self.redis_con)
712711
query_result = graph.query("CALL db.idx.fulltext.queryNodes('Monkeys', 'tamarin') YIELD node RETURN node.name")
713-
expected_result = [ ['Emperor Tamarin'],['Golden Lion Tamarin'], ['Cotton-top Tamarin'] ]
712+
expected_result = [['Emperor Tamarin'], ['Golden Lion Tamarin'], ['Cotton-top Tamarin']]
714713

715714
# We should find only the tamarins
716715
self.assertEqual(query_result.result_set, expected_result)
717716

718717

719-
720718
if __name__ == '__main__':
721719
unittest.main()

0 commit comments

Comments
 (0)