Skip to content

Commit

Permalink
Add option --import-tool to make it possible to run osmosis + psql in…
Browse files Browse the repository at this point in the history
… parallel

With --import-tool=osmosis-parallel, osmosis is dumping all tables in named
fifos, that are directly read by COPY used by multiple psql processes.
  • Loading branch information
jocelynj committed Jul 1, 2021
1 parent 60a1c6c commit 56801c2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
49 changes: 47 additions & 2 deletions modules/OsmOsisManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import shutil
import datetime
import time
import subprocess
try: # osmium still optional for now
import osmium # type: ignore
except:
Expand Down Expand Up @@ -169,6 +170,13 @@ def init_database(self, conf):
self.psql_f(script)

# data
if conf.import_tool == "osmosis-parallel":
# Run osmosis and psql import in parallel, thanks to named fifos to send
# generated tables directly to COPY functions.
parallel = True
else:
parallel = False

self.logger.log(self.logger.log_av_r+"import osmosis data"+self.logger.log_ap)
cmd = [conf.bin_osmosis]
dst_ext = os.path.splitext(conf.download["dst"])[1]
Expand All @@ -182,14 +190,51 @@ def init_database(self, conf):
cmd += ["-quiet"]
cmd += ["--write-pgsql-dump", "directory=%s" % dir_country_tmp, "enableLinestringBuilder=yes"]

if parallel:
for f in ['nodes.txt',
'ways.txt', 'way_nodes.txt',
'relations.txt', 'relation_members.txt',
'users.txt']:
os.mkfifo(os.path.join(dir_country_tmp, f))

try:
self.logger.execute_err(cmd)
bg_proc = []

bg_proc.append((self.logger.execute_err(cmd, background=parallel), "osmosis"))
if parallel:
# Reading stdout/stderr must not block
os.set_blocking(bg_proc[-1][0].stdout.fileno(), False)
os.set_blocking(bg_proc[-1][0].stderr.fileno(), False)

for script in conf.osmosis_import_prepare_scripts:
self.psql_f(script, cwd=dir_country_tmp)

for script in conf.osmosis_import_scripts:
self.psql_f(script, cwd=dir_country_tmp)
bg_proc.append((self.psql_f(script, cwd=dir_country_tmp, background=parallel), os.path.basename(script)))
if parallel:
os.set_blocking(bg_proc[-1][0].stdout.fileno(), False)
os.set_blocking(bg_proc[-1][0].stderr.fileno(), False)

if parallel:
# Wait for all background processes, and get their stdout/stderr messages
while bg_proc:
new_bg_proc = bg_proc[:]
for (p, name) in bg_proc:
try:
for line in p.stdout:
self.logger.log(" " + name + ": " + line.decode('utf-8').strip())
for line in p.stderr:
self.logger.log(" " + name + ": " + line.decode('utf-8').strip())
# Need timeout so that we don't have another process blocked
# while waiting for stdout/stderr to be emptied.
p.wait(timeout=1)
self.logger.log(" " + name + ": end")
except subprocess.TimeoutExpired:
continue
if p.returncode:
raise RuntimeError("'%s' exited with status %s" % (' '.join(p.args), repr(p.returncode)))
new_bg_proc.remove((p, name))
bg_proc = new_bg_proc

finally:
# clean even in case of an exception
Expand Down
3 changes: 3 additions & 0 deletions osmose_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,9 @@ def main(options):
parser.add_option("--extract-update-tool", dest="pbf_update_tool", action="store", default="osmosis",
help="Use \"osmosis\" (default) or \"osmium\" to update the OSM extract")

parser.add_option("--import-tool", dest="import_tool", action="store", default="osmosis",
help="Use \"osmosis\" (default) or \"osmosis-parallel\" to import to postgresql database")

parser.add_option("--version", dest="version", action="store_true",
help="Output version information and exit")

Expand Down

0 comments on commit 56801c2

Please sign in to comment.