Skip to content

Commit

Permalink
Add configuration for job definitions and queues
Browse files Browse the repository at this point in the history
  • Loading branch information
rmarianski committed Dec 4, 2017
1 parent c270944 commit 8f98e99
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
9 changes: 9 additions & 0 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,12 @@ rawr:
type: toi
# uncomment this to use RAWR tiles rather than go direct to the database.
#use-rawr-tiles: true
batch:
# zoom level to enqueue jobs
queue-zoom: 7
# name of batch job definition
job-definition: <job-definition>
# name of batch queue to use
job-queue: <job-queue>
# optional number of retry attempts, overrides job definition value
retry-attempts: 5
62 changes: 33 additions & 29 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1850,56 +1850,55 @@ def make_statsd_client_from_cfg(cfg):
def tilequeue_batch_enqueue(cfg, peripherals):
logger = make_logger(cfg, 'batch_enqueue')

rawr_yaml = cfg.yml.get('rawr')
assert rawr_yaml is not None, 'Missing rawr configuration in yaml'

group_by_zoom = rawr_yaml.get('group-zoom')
assert group_by_zoom is not None, 'Missing group-zoom rawr config'

import boto3
client = boto3.client('batch', region_name='us-east-1')

logger.info('Batch enqueue ...')

# enqueue jobs at z7
# this cuts down on the number of overall jobs that we have
# TODO ideally this would be configurable
enqueue_zoom = 7
dim = 2 ** enqueue_zoom
z = enqueue_zoom
batch_yaml = cfg.yml.get('batch')
assert batch_yaml, 'Missing batch config'

queue_zoom = batch_yaml.get('queue-zoom')
assert queue_zoom, 'Missing batch queue-zoom config'

job_def = batch_yaml.get('job-definition')
assert job_def, 'Missing batch job-definition config'
job_queue = batch_yaml.get('job-queue')
assert job_queue, 'Missing batch job-queue config'

retry_attempts = batch_yaml.get('retry-attempts')

dim = 2 ** queue_zoom
z = queue_zoom
i = 0
stop = False
for y in xrange(dim):
if stop:
break
for x in xrange(dim):
coord_str = '%d/%d/%d' % (z, x, y)
job_name = 'metatile-process-%d-%d-%d' % (z, x, y)
# TODO maybe passing the tile through an environment variable is
# cleaner, and we won't have to know about where the tilequeue
# binary and config file are in the image
job_cmd = [
'tilequeue', 'batch-process',
'--config', '/etc/tilequeue/config.yaml',
'--tile', coord_str,
]
job_opts = dict(
# TODO - fill these out
jobDefinition='tile-metatile-process',
jobQueue='tile-metatile-queue',
jobDefinition=job_def,
jobQueue=job_queue,
jobName=job_name,
containerOverrides={
'command': job_cmd,
},
retryStrategy={
'attempts': 5
}
)
if retry_attempts is not None:
job_opts['retryStrategy'] = dict(attempts=retry_attempts)
resp = client.submit_job(**job_opts)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, \
'Failed to submit job: %s' % 'JobName'
i += 1
if i % 10000 == 0:
print i
stop = True
break

logger.info('Batch enqueue ... done')

Expand All @@ -1916,13 +1915,19 @@ def tilequeue_batch_process(cfg, args):
logger.info('batch process ... start')

coord_str = args.tile
# TODO ideally configurable
z7_coord = deserialize_coord(coord_str)
if not z7_coord:

batch_yaml = cfg.yml.get('batch')
assert batch_yaml, 'Missing batch config'

queue_zoom = batch_yaml.get('queue-zoom')
assert queue_zoom, 'Missing batch queue-zoom config'

queue_coord = deserialize_coord(coord_str)
if not queue_coord:
print >> sys.stderr, 'Invalid coordinate: %s' % coord_str
sys.exit(2)

assert z7_coord.zoom == 7
assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str

logger.info('batch process: %s' % coord_str)

Expand Down Expand Up @@ -1958,7 +1963,7 @@ def find_z10_coords_for(coord):
zoom_stop = cfg.max_zoom
formats = lookup_formats(cfg.output_formats)

z10_coords = find_z10_coords_for(z7_coord)
z10_coords = find_z10_coords_for(queue_coord)
for z10coord in z10_coords:
# each coord here is the z10 unit of work now
pyramid_coords = [z10coord]
Expand All @@ -1984,7 +1989,6 @@ def find_z10_coords_for(coord):

tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
# TODO write_tile_if_changed?
store.write_tile(tile['tile'], tile['coord'], tile['format'],
tile['layer'])

Expand Down

0 comments on commit 8f98e99

Please sign in to comment.