Skip to content

Commit d0f54d5

Browse files
committed
fixed PR comments
1 parent 98a40ca commit d0f54d5

File tree

3 files changed

+15
-12
lines changed

3 files changed

+15
-12
lines changed

redisgraph_bulk_loader/bulk_insert.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,23 @@ async def process_entities(entities):
7070
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
7171
@click.option('--index', '-i', multiple=True, help='Label:Propery on which to create an index')
7272
@click.option('--full-text-index', '-f', multiple=True, help='Label:Propery on which to create an full text search index')
73-
@click.option('--async-requests', '-A', default=3, help='amount of async requests to be executed in parallel' )
73+
@click.option('--async-requests', '-A', default=3, help='number of async requests to be executed in parallel' )
7474
async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes, nodes_with_label, relations, relations_with_type, separator, enforce_schema, skip_invalid_nodes, skip_invalid_edges, escapechar, quote, max_token_count, max_buffer_size, max_token_size, index, full_text_index, async_requests):
75-
if sys.version_info.major < 3 or sys.version_info.minor < 6:
76-
raise Exception("Python >= 3.6 is required for the RedisGraph bulk loader.")
75+
if sys.version_info.major < 3 or sys.version_info.minor < 8:
76+
raise Exception("Python >= 3.8 is required for the RedisGraph bulk loader.")
7777

7878
if not (any(nodes) or any(nodes_with_label)):
7979
raise Exception("At least one node file must be specified.")
80+
if async_requests <= 0:
81+
raise Exception("The number of async requests must be greater than zero")
8082

8183
start_time = timer()
8284

8385
# If relations are being built, we must store unique node identifiers to later resolve endpoints.
8486
store_node_identifiers = any(relations) or any(relations_with_type)
8587

8688
# Initialize configurations with command-line arguments
87-
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar)
89+
config = Config(max_token_count, max_buffer_size, max_token_size, enforce_schema, skip_invalid_nodes, skip_invalid_edges, separator, int(quote), store_node_identifiers, escapechar, async_requests)
8890

8991
# Attempt to connect to Redis server
9092
try:
@@ -112,7 +114,7 @@ async def bulk_insert(graph, host, port, password, user, unix_socket_path, nodes
112114
print("Graph with name '%s', could not be created, as Redis key '%s' already exists." % (graph, graph))
113115
sys.exit(1)
114116

115-
query_buf = QueryBuffer(graph, client, config, async_requests)
117+
query_buf = QueryBuffer(graph, client, config)
116118

117119
# Read the header rows of each input CSV and save its schema.
118120
labels = parse_schemas(Label, query_buf, nodes, nodes_with_label, config)

redisgraph_bulk_loader/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
class Config:
2-
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\'):
2+
def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token_size=512, enforce_schema=False, skip_invalid_nodes=False, skip_invalid_edges=False, separator=',', quoting=3, store_node_identifiers=False, escapechar='\\', async_requests =3 ):
33
"""Settings for this run of the bulk loader"""
44
# Maximum number of tokens per query
55
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
@@ -18,6 +18,7 @@ def __init__(self, max_token_count=1024 * 1023, max_buffer_size=2_048, max_token
1818
self.separator = separator
1919
self.quoting = quoting
2020
self.escapechar = None if escapechar.lower() == "none" else escapechar
21+
self.async_requests = async_requests
2122

2223
# True if we are building relations as well as nodes
2324
self.store_node_identifiers = store_node_identifiers

redisgraph_bulk_loader/query_buffer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ def send_buffer(self, initial_query):
2929
return self.client.execute_command("GRAPH.BULK", self.graphname, *args)
3030

3131
class QueryBuffer:
32-
def __init__(self, graphname, client, config, async_requests):
32+
def __init__(self, graphname, client, config):
3333

3434
self.client = client
3535
self.graphname = graphname
3636
self.config = config
37-
self.async_requests = async_requests
37+
self.async_requests = config.async_requests
3838

3939
# A queue of internal buffers
4040
self.internal_buffers = list()
41-
for i in range(async_requests):
41+
for i in range(self.async_requests):
4242
self.internal_buffers.append(InternalBuffer(graphname, client))
4343
# Each buffer sent to RedisGraph returns awaitable
4444
self.awaitables = set()
@@ -60,9 +60,9 @@ def __init__(self, graphname, client, config, async_requests):
6060
async def send_buffer(self, flush=False):
6161
# If flush is needed all of the awaitables need to be complete, otherwise at least one is needed.
6262
return_when_flag = asyncio.ALL_COMPLETED if flush is True else asyncio.FIRST_COMPLETED
63-
awaitable = self.current_buffer.send_buffer(self.initial_query)
64-
if awaitable is not None:
65-
self.awaitables.add(awaitable)
63+
coro = self.current_buffer.send_buffer(self.initial_query)
64+
if coro is not None:
65+
self.awaitables.add(asyncio.create_task(coro))
6666
# Requests are flushed and awaited when:
6767
# 1. Flush is needed.
6868
# 2. Initial query with BEGIN token, to avoid race condition on async RedisGraph servers.

0 commit comments

Comments
 (0)