Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused commands #254

Merged
merged 1 commit into from
Oct 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions logging.conf.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[loggers]
keys=root,process,seed,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic,tile_status,stuck_tiles,delete_stuck_tiles,rawr_enqueue,rawr_process
keys=root,process,seed,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic,tile_status,stuck_tiles,delete_stuck_tiles,rawr_enqueue,rawr_process

[handlers]
keys=consoleHandler
Expand Down Expand Up @@ -47,12 +47,6 @@ handlers=consoleHandler
qualName=seed
propagate=0

[logger_drain]
level=INFO
handlers=consoleHandler
qualName=drain
propagate=0

[logger_wof_process_neighbourhoods]
level=INFO
handlers=consoleHandler
Expand Down
196 changes: 0 additions & 196 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,6 @@ def make_seed_tile_generator(cfg):
return tile_generator


def tilequeue_drain(cfg, peripherals):
logger = make_logger(cfg, 'drain')
logger.info('Draining queues ...')
queue_mapper = peripherals.queue_mapper
total_cleared = 0
for _, tile_queue in queue_mapper.queues_in_priority_order():
n = tile_queue.clear()
if n is not None and n > 0:
total_cleared += n
logger.info('Draining queues ... done')
logger.info('Removed %d messages' % total_cleared)


def explode_and_intersect(coord_ints, tiles_of_interest, until=0):

next_coord_ints = coord_ints
Expand Down Expand Up @@ -1184,187 +1171,6 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):
time_overall.stop()


def tilequeue_tile_sizes(cfg, peripherals):
# find averages, counts, and medians for metro extract tiles
assert cfg.metro_extract_url
with closing(urlopen(cfg.metro_extract_url)) as fp:
metro_extracts = parse_metro_extract(fp)

# zooms to get sizes for, inclusive
zoom_start = 11
zoom_until = 15

bucket_name = cfg.s3_bucket

formats = lookup_formats(cfg.output_formats)

work_buffer_size = 1000
work = Queue.Queue(work_buffer_size)

from boto import connect_s3
from boto.s3.bucket import Bucket
s3_conn = connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key)
bucket = Bucket(s3_conn, bucket_name)

lock = threading.Lock()

def new_total_count():
return dict(
sum=0,
n=0,
elts=[],
)

region_counts = {}
city_counts = {}
zoom_counts = {}
format_counts = {}
grand_total_count = new_total_count()

def update_total_count(total_count, size):
total_count['sum'] += size
total_count['n'] += 1
total_count['elts'].append(size)

def add_size(metro, coord, format, size):
with lock:
region_count = region_counts.get(metro.region)
if region_count is None:
region_counts[metro.region] = region_count = new_total_count()
update_total_count(region_count, size)

city_count = city_counts.get(metro.city)
if city_count is None:
city_counts[metro.city] = city_count = new_total_count()
update_total_count(city_count, size)

zoom_count = zoom_counts.get(coord.zoom)
if zoom_count is None:
zoom_counts[coord.zoom] = zoom_count = new_total_count()
update_total_count(zoom_count, size)

format_count = format_counts.get(format.extension)
if format_count is None:
format_counts[format.extension] = format_count = \
new_total_count()
update_total_count(format_count, size)

update_total_count(grand_total_count, size)

from tilequeue.tile import serialize_coord

def process_work_data():
while True:
work_data = work.get()
if work_data is None:
break
coord = work_data['coord']
format = work_data['format']
key_path = 'osm/all/%s.%s' % (
serialize_coord(coord), format.extension)
key = bucket.get_key(key_path)
# this shouldn't practically happen
if key is None:
continue
size = key.size
add_size(work_data['metro'], coord, format, size)

# start all threads
n_threads = 50
worker_threads = []
for i in range(n_threads):
worker_thread = threading.Thread(target=process_work_data)
worker_thread.start()
worker_threads.append(worker_thread)

# enqueue all work
for metro_extract in metro_extracts:
metro_tiles = tile_generator_for_single_bounds(
metro_extract.bounds, zoom_start, zoom_until)
for tile in metro_tiles:
for format in formats:
work_data = dict(
metro=metro_extract,
coord=tile,
format=format,
)
work.put(work_data)

# tell workers to stop
for i in range(n_threads):
work.put(None)
for worker_thread in worker_threads:
worker_thread.join()

def calc_median(elts):
if not elts:
return -1
elts.sort()
n = len(elts)
middle = n / 2
if n % 2 == 0:
return (float(elts[middle]) + float(elts[middle + 1])) / float(2)
else:
return elts[middle]

def calc_avg(total, n):
if n == 0:
return -1
return float(total) / float(n)

def format_commas(x):
return '{:,}'.format(x)

def format_kilos(size_in_bytes):
kilos = int(float(size_in_bytes) / float(1000))
kilos_commas = format_commas(kilos)
return '%sK' % kilos_commas

# print results
def print_count(label, total_count):
median = calc_median(total_count['elts'])
avg = calc_avg(total_count['sum'], total_count['n'])
if label:
label_str = '%s -> ' % label
else:
label_str = ''
print '%scount: %s - avg: %s - median: %s' % (
label_str, format_commas(total_count['n']),
format_kilos(avg), format_kilos(median))

print 'Regions'
print '*' * 80
region_counts = sorted(region_counts.iteritems())
for region_name, region_count in region_counts:
print_count(region_name, region_count)

print '\n\n'
print 'Cities'
print '*' * 80
city_counts = sorted(city_counts.iteritems())
for city_name, city_count in city_counts:
print_count(city_name, city_count)

print '\n\n'
print 'Zooms'
print '*' * 80
zoom_counts = sorted(zoom_counts.iteritems())
for zoom, zoom_count in zoom_counts:
print_count(zoom, zoom_count)

print '\n\n'
print 'Formats'
print '*' * 80
format_counts = sorted(format_counts.iteritems())
for format_extension, format_count in format_counts:
print_count(format_extension, format_count)

print '\n\n'
print 'Grand total'
print '*' * 80
print_count(None, grand_total_count)


def tilequeue_process_wof_neighbourhoods(cfg, peripherals):
from tilequeue.rawr import make_rawr_enqueuer
from tilequeue.wof import make_wof_model
Expand Down Expand Up @@ -1790,12 +1596,10 @@ def tilequeue_main(argv_args=None):
cfg_commands = (
('process', tilequeue_process),
('seed', tilequeue_seed),
('drain', tilequeue_drain),
('dump-tiles-of-interest', tilequeue_dump_tiles_of_interest),
('load-tiles-of-interest', tilequeue_load_tiles_of_interest),
('enqueue-tiles-of-interest', tilequeue_enqueue_tiles_of_interest),
('prune-tiles-of-interest', tilequeue_prune_tiles_of_interest),
('tile-size', tilequeue_tile_sizes),
('wof-process-neighbourhoods', tilequeue_process_wof_neighbourhoods),
('wof-load-initial-neighbourhoods',
tilequeue_initial_load_wof_neighbourhoods),
Expand Down