Skip to content

Commit

Permalink
Merge pull request #315 from tilezen/batch
Browse files Browse the repository at this point in the history
Add batch enqueue/process commands
  • Loading branch information
rmarianski authored Dec 4, 2017
2 parents d987115 + ec33b53 commit 44d5fe6
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 1 deletion.
9 changes: 9 additions & 0 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,12 @@ rawr:
type: toi
# uncomment this to use RAWR tiles rather than go direct to the database.
#use-rawr-tiles: true
batch:
# zoom level to enqueue jobs
queue-zoom: 7
# name of batch job definition
job-definition: <job-definition>
# name of batch queue to use
job-queue: <job-queue>
# optional number of retry attempts, overrides job definition value
retry-attempts: 5
167 changes: 166 additions & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def make_queue_mapper(queue_mapper_yaml, tile_queue_name_map, toi):
queue_mapper_type = queue_mapper_yaml.get('type')
assert queue_mapper_type, 'Missing queue mapper type'
if queue_mapper_type == 'single':
queue_name = queue_mapper_yaml.get('queue-name')
queue_name = queue_mapper_yaml.get('name')
assert queue_name, 'Missing queue name in queue mapper config'
tile_queue = tile_queue_name_map.get(queue_name)
assert tile_queue, 'No queue found in mapping for %s' % queue_name
Expand Down Expand Up @@ -1847,6 +1847,163 @@ def make_statsd_client_from_cfg(cfg):
return stats


def tilequeue_batch_enqueue(cfg, peripherals):
logger = make_logger(cfg, 'batch_enqueue')

import boto3
client = boto3.client('batch', region_name='us-east-1')

logger.info('Batch enqueue ...')

batch_yaml = cfg.yml.get('batch')
assert batch_yaml, 'Missing batch config'

queue_zoom = batch_yaml.get('queue-zoom')
assert queue_zoom, 'Missing batch queue-zoom config'

job_def = batch_yaml.get('job-definition')
assert job_def, 'Missing batch job-definition config'
job_queue = batch_yaml.get('job-queue')
assert job_queue, 'Missing batch job-queue config'

retry_attempts = batch_yaml.get('retry-attempts')

dim = 2 ** queue_zoom
z = queue_zoom
i = 0
for y in xrange(dim):
for x in xrange(dim):
coord_str = '%d/%d/%d' % (z, x, y)
job_name = 'metatile-process-%d-%d-%d' % (z, x, y)
# TODO maybe passing the tile through an environment variable is
# cleaner, and we won't have to know about where the tilequeue
# binary and config file are in the image
job_cmd = [
'tilequeue', 'batch-process',
'--config', '/etc/tilequeue/config.yaml',
'--tile', coord_str,
]
job_opts = dict(
jobDefinition=job_def,
jobQueue=job_queue,
jobName=job_name,
containerOverrides={
'command': job_cmd,
},
)
if retry_attempts is not None:
job_opts['retryStrategy'] = dict(attempts=retry_attempts)
resp = client.submit_job(**job_opts)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, \
'Failed to submit job: %s' % 'JobName'
i += 1
if i % 10000 == 0:
print i

logger.info('Batch enqueue ... done')


def tilequeue_batch_process(cfg, args):
from tilequeue.metatile import make_metatiles

logger = make_logger(cfg, 'batch_process')

# TODO log json

store = _make_store(cfg)

logger.info('batch process ... start')

coord_str = args.tile

batch_yaml = cfg.yml.get('batch')
assert batch_yaml, 'Missing batch config'

queue_zoom = batch_yaml.get('queue-zoom')
assert queue_zoom, 'Missing batch queue-zoom config'

queue_coord = deserialize_coord(coord_str)
if not queue_coord:
print >> sys.stderr, 'Invalid coordinate: %s' % coord_str
sys.exit(2)

assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str

logger.info('batch process: %s' % coord_str)

# TODO generalize and move to tile.py?
def find_job_coords_for(coord, target_zoom):
xmin = coord.column
xmax = coord.column
ymin = coord.row
ymax = coord.row
assert target_zoom > coord.zoom
for i in xrange(target_zoom - coord.zoom):
xmin *= 2
ymin *= 2
xmax = xmax * 2 + 1
ymax = ymax * 2 + 1
for y in xrange(ymin, ymax+1):
for x in xrange(xmin, xmax+1):
yield Coordinate(zoom=10, column=x, row=y)

with open(cfg.query_cfg) as query_cfg_fp:
query_cfg = yaml.load(query_cfg_fp)

all_layer_data, layer_data, post_process_data = (
parse_layer_data(
query_cfg, cfg.buffer_cfg, os.path.dirname(cfg.query_cfg)))

output_calc_mapping = make_output_calc_mapping(cfg.process_yaml_cfg)
io_pool = ThreadPool(len(layer_data))

data_fetcher = make_data_fetcher(cfg, layer_data, query_cfg, io_pool)

rawr_yaml = cfg.yml.get('rawr')
assert rawr_yaml is not None, 'Missing rawr configuration in yaml'

group_by_zoom = rawr_yaml.get('group-zoom')
assert group_by_zoom is not None, 'Missing group-zoom rawr config'

# NOTE: max_zoom looks to be inclusive
zoom_stop = cfg.max_zoom
assert zoom_stop > group_by_zoom
formats = lookup_formats(cfg.output_formats)

job_coords = find_job_coords_for(queue_coord, group_by_zoom)
for job_coord in job_coords:
# each coord here is the unit of work now
pyramid_coords = [job_coord]
pyramid_coords.extend(coord_children_range(job_coord, zoom_stop))
coord_data = [dict(coord=x) for x in pyramid_coords]
for fetch, coord_datum in data_fetcher.fetch_tiles(coord_data):
coord = coord_datum['coord']
nominal_zoom = coord.zoom + cfg.metatile_zoom
unpadded_bounds = coord_to_mercator_bounds(coord)
source_rows = fetch(nominal_zoom, unpadded_bounds)
feature_layers = convert_source_data_to_feature_layers(
source_rows, layer_data, unpadded_bounds, coord.zoom)

cut_coords = [coord]
if nominal_zoom > coord.zoom:
cut_coords.extend(coord_children_range(coord, nominal_zoom))

formatted_tiles, extra_data = process_coord(
coord, coord.zoom, feature_layers, post_process_data, formats,
unpadded_bounds, cut_coords, cfg.buffer_cfg,
output_calc_mapping
)

tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
for tile in tiles:
store.write_tile(tile['tile'], tile['coord'], tile['format'],
tile['layer'])

# TODO log?

logger.info('batch process ... done')


def tilequeue_main(argv_args=None):
if argv_args is None:
argv_args = sys.argv[1:]
Expand All @@ -1871,6 +2028,7 @@ def tilequeue_main(argv_args=None):
('delete-stuck-tiles', tilequeue_delete_stuck_tiles),
('rawr-process', tilequeue_rawr_process),
('rawr-seed-toi', tilequeue_rawr_seed_toi),
('batch-enqueue', tilequeue_batch_enqueue),
)

def _make_peripherals(cfg):
Expand Down Expand Up @@ -1986,6 +2144,13 @@ def command_fn(cfg, args):
help='path to tile expiry file')
subparser.set_defaults(func=tilequeue_rawr_enqueue)

subparser = subparsers.add_parser('batch-process')
subparser.add_argument('--config', required=True,
help='The path to the tilequeue config file.')
subparser.add_argument('--tile', required=True,
help='Tile coordinate as "z/x/y".')
subparser.set_defaults(func=tilequeue_batch_process)

args = parser.parse_args(argv_args)
assert os.path.exists(args.config), \
'Config file {} does not exist!'.format(args.config)
Expand Down

0 comments on commit 44d5fe6

Please sign in to comment.