Skip to content

Commit

Permalink
Add command to enqueue randomly sampled pyramids
Browse files Browse the repository at this point in the history
  • Loading branch information
rmarianski committed Nov 14, 2017
1 parent 7b5c713 commit 5b8252b
Showing 1 changed file with 104 additions and 8 deletions.
112 changes: 104 additions & 8 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from collections import namedtuple
from contextlib import closing
from itertools import chain
from ModestMaps.Core import Coordinate
from multiprocessing.pool import ThreadPool
from random import randrange
from tilequeue.config import create_query_bounds_pad_fn
from tilequeue.config import make_config_from_argparse
from tilequeue.format import lookup_format_by_extension
Expand Down Expand Up @@ -954,19 +956,30 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals):
logger.info('%d tiles of interest processed' % n_toi)


def coord_pyramid(coord, zoom_start, zoom_stop):
"""
generate full pyramid for coord
Generate the full pyramid for a single coordinate. Note that zoom_stop is
exclusive.
"""
if zoom_start <= coord.zoom:
yield coord
for child_coord in coord_children_range(coord, zoom_stop):
if zoom_start <= child_coord.zoom:
yield child_coord


def coord_pyramids(coords, zoom_start, zoom_stop):
"""
generate full pyramid for coords
Generate the full pyrmaid for the list of coords. Note that zoom_stop is
Generate the full pyramid for the list of coords. Note that zoom_stop is
exclusive.
"""
for coord in coords:
if zoom_start <= coord.zoom:
yield coord
for child_coord in coord_children_range(coord, zoom_stop):
if zoom_start <= child_coord.zoom:
yield child_coord
for child in coord_pyramid(coord, zoom_start, zoom_stop):
yield child


def tilequeue_enqueue_full_pyramid_from_toi(cfg, peripherals, args):
Expand Down Expand Up @@ -1008,9 +1021,9 @@ def tilequeue_enqueue_full_pyramid_from_toi(cfg, peripherals, args):
coord_at_group_zoom = coord.zoomTo(group_by_zoom).container()
coords_at_group_zoom.add(coord_at_group_zoom)

pyramid = coord_pyramids(coords_at_group_zoom, zoom_start, zoom_stop)
pyramids = coord_pyramids(coords_at_group_zoom, zoom_start, zoom_stop)

coords_to_enqueue = chain(ungrouped, pyramid)
coords_to_enqueue = chain(ungrouped, pyramids)

queue_writer = peripherals.queue_writer
n_queued, n_in_flight = queue_writer.enqueue_batch(coords_to_enqueue)
Expand All @@ -1019,6 +1032,75 @@ def tilequeue_enqueue_full_pyramid_from_toi(cfg, peripherals, args):
logger.info('%d tiles of interest processed' % n_toi)


def tilequeue_enqueue_random_pyramids(cfg, peripherals, args):
"""enqueue random pyramids"""
logger = make_logger(cfg, 'enqueue_random_pyramids')

rawr_yaml = cfg.yml.get('rawr')
assert rawr_yaml, 'Missing rawr yaml'
group_by_zoom = rawr_yaml.get('group-zoom')
assert group_by_zoom, 'Missing rawr group-zoom'
assert isinstance(group_by_zoom, int), 'Invalid rawr group-zoom'

if args.zoom_start is None:
zoom_start = group_by_zoom
else:
zoom_start = args.zoom_start

if args.zoom_stop is None:
zoom_stop = cfg.max_zoom + 1 # +1 because exclusive
else:
zoom_stop = args.zoom_stop

assert zoom_start >= group_by_zoom
assert zoom_stop > zoom_start

gridsize = args.gridsize
total_samples = getattr(args, 'n-samples')
samples_per_cell = total_samples / (gridsize * gridsize)

tileset_dim = 2 ** group_by_zoom

scale_factor = float(tileset_dim) / float(gridsize)

queue_writer = peripherals.queue_writer
n_queued = 0
n_in_flight = 0

for grid_y in xrange(gridsize):
tile_y_min = int(grid_y * scale_factor)
tile_y_max = int((grid_y+1) * scale_factor)
for grid_x in xrange(gridsize):
tile_x_min = int(grid_x * scale_factor)
tile_x_max = int((grid_x+1) * scale_factor)

cell_samples = set()

for i in xrange(samples_per_cell):

while True:
rand_x = randrange(tile_x_min, tile_x_max)
rand_y = randrange(tile_y_min, tile_y_max)
cell = rand_x, rand_y
if cell in cell_samples:
continue
cell_samples.add(cell)
break

# enqueue a cell at a time
# the queue mapper expects to be able to read the entirety of the
# input into memory first
for x, y in cell_samples:
coord = Coordinate(zoom=group_by_zoom, column=x, row=y)
pyramid = coord_pyramid(coord, zoom_start, zoom_stop)
cell_enqueued, cell_in_flight = queue_writer.enqueue_batch(
pyramid)
n_queued += cell_enqueued
n_in_flight += cell_in_flight

logger.info('%d enqueued - %d in flight' % (n_queued, n_in_flight))


def tilequeue_consume_tile_traffic(cfg, peripherals):
logger = make_logger(cfg, 'consume_tile_traffic')
logger.info('Consuming tile traffic logs ...')
Expand Down Expand Up @@ -1871,6 +1953,20 @@ def command_fn(cfg, args):
func=_make_peripherals_with_args_command(
tilequeue_enqueue_full_pyramid_from_toi))

subparser = subparsers.add_parser('enqueue-random-pyramids')
subparser.add_argument('--config', required=True,
help='The path to the tilequeue config file.')
subparser.add_argument('--zoom-start', type=int, required=False,
default=None, help='Zoom start')
subparser.add_argument('--zoom-stop', type=int, required=False,
default=None, help='Zoom stop, exclusive')
subparser.add_argument('gridsize', type=int, help='Dimension of grid size')
subparser.add_argument('n-samples', type=int,
help='Number of total samples')
subparser.set_defaults(
func=_make_peripherals_with_args_command(
tilequeue_enqueue_random_pyramids))

subparser = subparsers.add_parser('rawr-enqueue')
subparser.add_argument('--config', required=True,
help='The path to the tilequeue config file.')
Expand Down

0 comments on commit 5b8252b

Please sign in to comment.