From 8f98e9963e789ea34add507dea2fca4a672a9eb4 Mon Sep 17 00:00:00 2001 From: Robert Marianski Date: Mon, 4 Dec 2017 17:13:42 -0500 Subject: [PATCH] Add configuration for job definitions and queues --- config.yaml.sample | 9 +++++++ tilequeue/command.py | 62 +++++++++++++++++++++++--------------------- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/config.yaml.sample b/config.yaml.sample index 9be324e9..7fc6bf89 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -377,3 +377,12 @@ rawr: type: toi # uncomment this to use RAWR tiles rather than go direct to the database. #use-rawr-tiles: true +batch: + # zoom level to enqueue jobs + queue-zoom: 7 + # name of batch job definition + job-definition: + # name of batch queue to use + job-queue: + # optional number of retry attempts, overrides job definition value + retry-attempts: 5 diff --git a/tilequeue/command.py b/tilequeue/command.py index d56ffcc6..2419cb05 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1850,56 +1850,55 @@ def make_statsd_client_from_cfg(cfg): def tilequeue_batch_enqueue(cfg, peripherals): logger = make_logger(cfg, 'batch_enqueue') - rawr_yaml = cfg.yml.get('rawr') - assert rawr_yaml is not None, 'Missing rawr configuration in yaml' - - group_by_zoom = rawr_yaml.get('group-zoom') - assert group_by_zoom is not None, 'Missing group-zoom rawr config' - import boto3 client = boto3.client('batch', region_name='us-east-1') logger.info('Batch enqueue ...') - # enqueue jobs at z7 - # this cuts down on the number of overall jobs that we have - # TODO ideally this would be configurable - enqueue_zoom = 7 - dim = 2 ** enqueue_zoom - z = enqueue_zoom + batch_yaml = cfg.yml.get('batch') + assert batch_yaml, 'Missing batch config' + + queue_zoom = batch_yaml.get('queue-zoom') + assert queue_zoom, 'Missing batch queue-zoom config' + + job_def = batch_yaml.get('job-definition') + assert job_def, 'Missing batch job-definition config' + job_queue = batch_yaml.get('job-queue') + assert job_queue, 'Missing batch job-queue config' + + retry_attempts = batch_yaml.get('retry-attempts') + + dim = 2 ** queue_zoom + z = queue_zoom i = 0 - stop = False for y in xrange(dim): - if stop: - break for x in xrange(dim): coord_str = '%d/%d/%d' % (z, x, y) job_name = 'metatile-process-%d-%d-%d' % (z, x, y) + # TODO maybe passing the tile through an environment variable is + # cleaner, and we won't have to know about where the tilequeue + # binary and config file are in the image job_cmd = [ 'tilequeue', 'batch-process', '--config', '/etc/tilequeue/config.yaml', '--tile', coord_str, ] job_opts = dict( - # TODO - fill these out - jobDefinition='tile-metatile-process', - jobQueue='tile-metatile-queue', + jobDefinition=job_def, + jobQueue=job_queue, jobName=job_name, containerOverrides={ 'command': job_cmd, }, - retryStrategy={ - 'attempts': 5 - } ) + if retry_attempts is not None: + job_opts['retryStrategy'] = dict(attempts=retry_attempts) resp = client.submit_job(**job_opts) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, \ 'Failed to submit job: %s' % 'JobName' i += 1 if i % 10000 == 0: print i - stop = True - break logger.info('Batch enqueue ... done') @@ -1916,13 +1915,19 @@ def tilequeue_batch_process(cfg, args): logger.info('batch process ... start') coord_str = args.tile - # TODO ideally configurable - z7_coord = deserialize_coord(coord_str) - if not z7_coord: + + batch_yaml = cfg.yml.get('batch') + assert batch_yaml, 'Missing batch config' + + queue_zoom = batch_yaml.get('queue-zoom') + assert queue_zoom, 'Missing batch queue-zoom config' + + queue_coord = deserialize_coord(coord_str) + if not queue_coord: print >> sys.stderr, 'Invalid coordinate: %s' % coord_str sys.exit(2) - assert z7_coord.zoom == 7 + assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str logger.info('batch process: %s' % coord_str) @@ -1958,7 +1963,7 @@ def find_z10_coords_for(coord): zoom_stop = cfg.max_zoom formats = lookup_formats(cfg.output_formats) - z10_coords = find_z10_coords_for(z7_coord) + z10_coords = find_z10_coords_for(queue_coord) for z10coord in z10_coords: # each coord here is the z10 unit of work now pyramid_coords = [z10coord] @@ -1984,7 +1989,6 @@ def find_z10_coords_for(coord): tiles = make_metatiles(cfg.metatile_size, formatted_tiles) for tile in tiles: - # TODO write_tile_if_changed? store.write_tile(tile['tile'], tile['coord'], tile['format'], tile['layer'])