Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy things up for a data fetcher interface. #228

Merged
merged 3 commits into from
Aug 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 4 additions & 50 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,13 @@
from collections import namedtuple
from contextlib import closing
from itertools import chain
from jinja2 import Environment
from jinja2 import FileSystemLoader
from multiprocessing.pool import ThreadPool
from tilequeue.config import create_query_bounds_pad_fn
from tilequeue.config import make_config_from_argparse
from tilequeue.format import lookup_format_by_extension
from tilequeue.metro_extract import city_bounds
from tilequeue.metro_extract import parse_metro_extract
from tilequeue.query import DataFetcher
from tilequeue.query import jinja_filter_bbox
from tilequeue.query import jinja_filter_bbox_filter
from tilequeue.query import jinja_filter_bbox_intersection
from tilequeue.query import jinja_filter_bbox_overlaps
from tilequeue.query import jinja_filter_bbox_padded_intersection
from tilequeue.query import jinja_filter_geometry
from tilequeue.query import SourcesQueriesGenerator
from tilequeue.query import TemplateFinder
from tilequeue.query import TemplateQueryGenerator
from tilequeue.query import make_db_data_fetcher
from tilequeue.queue import make_sqs_queue
from tilequeue.tile import coord_int_zoom_up
from tilequeue.tile import coord_is_valid
Expand Down Expand Up @@ -480,42 +469,9 @@ def _parse_postprocess_resources(post_process_item, cfg_path):
return resources


def make_jinja_environment(template_path):
environment = Environment(loader=FileSystemLoader(template_path))
environment.filters['geometry'] = jinja_filter_geometry
environment.filters['bbox_filter'] = jinja_filter_bbox_filter
environment.filters['bbox_intersection'] = jinja_filter_bbox_intersection
environment.filters['bbox_padded_intersection'] = (
jinja_filter_bbox_padded_intersection)
environment.filters['bbox'] = jinja_filter_bbox
environment.filters['bbox_overlaps'] = jinja_filter_bbox_overlaps
return environment


SourcesConfig = namedtuple('SourcesConfig', 'sources queries_generator')


def parse_source_data(queries_cfg):
from tilequeue.query import make_source
sources_cfg = queries_cfg['sources']
sources = []
for source_name, source_data in sources_cfg.items():
template = source_data['template']
start_zoom = int(source_data.get('start_zoom', 0))
source = make_source(source_name, template, start_zoom)
sources.append(source)
return sources


def make_queries_generator(sources, template_path, reload_templates):
jinja_environment = make_jinja_environment(template_path)
cache_templates = not reload_templates
template_finder = TemplateFinder(jinja_environment, cache_templates)
query_generator = TemplateQueryGenerator(template_finder)
queries_generator = SourcesQueriesGenerator(sources, query_generator)
return queries_generator


def parse_layer_data(query_cfg, buffer_cfg, cfg_path):
all_layer_names = query_cfg['all']
layers_config = query_cfg['layers']
Expand Down Expand Up @@ -647,11 +603,9 @@ def tilequeue_process(cfg, peripherals):
n_max_io_workers = 50
n_io_workers = min(n_total_needed, n_max_io_workers)
io_pool = ThreadPool(n_io_workers)
sources = parse_source_data(query_cfg)
queries_generator = make_queries_generator(
sources, cfg.template_path, cfg.reload_templates)
feature_fetcher = DataFetcher(
cfg.postgresql_conn_info, queries_generator, io_pool)
feature_fetcher = make_db_data_fetcher(
cfg.postgresql_conn_info, cfg.template_path, cfg.reload_templates,
query_cfg, io_pool)

# create all queues used to manage pipeline

Expand Down
3 changes: 3 additions & 0 deletions tilequeue/query/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from tilequeue.query import postgres

make_db_data_fetcher = postgres.make_db_data_fetcher
48 changes: 48 additions & 0 deletions tilequeue/query.py → tilequeue/query/postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from collections import namedtuple
from jinja2 import Environment
from jinja2 import FileSystemLoader
from psycopg2.extras import RealDictCursor
from tilequeue.postgresql import DBConnectionPool
from tilequeue.transform import calculate_padded_bounds
Expand Down Expand Up @@ -203,3 +205,49 @@ def __call__(self, zoom, unpadded_bounds):
read_rows.append(read_row)

return read_rows


def make_jinja_environment(template_path):
environment = Environment(loader=FileSystemLoader(template_path))
environment.filters['geometry'] = jinja_filter_geometry
environment.filters['bbox_filter'] = jinja_filter_bbox_filter
environment.filters['bbox_intersection'] = jinja_filter_bbox_intersection
environment.filters['bbox_padded_intersection'] = (
jinja_filter_bbox_padded_intersection)
environment.filters['bbox'] = jinja_filter_bbox
environment.filters['bbox_overlaps'] = jinja_filter_bbox_overlaps
return environment


def make_queries_generator(sources, template_path, reload_templates):
jinja_environment = make_jinja_environment(template_path)
cache_templates = not reload_templates
template_finder = TemplateFinder(jinja_environment, cache_templates)
query_generator = TemplateQueryGenerator(template_finder)
queries_generator = SourcesQueriesGenerator(sources, query_generator)
return queries_generator


def parse_source_data(queries_cfg):
sources_cfg = queries_cfg['sources']
sources = []
for source_name, source_data in sources_cfg.items():
template = source_data['template']
start_zoom = int(source_data.get('start_zoom', 0))
source = make_source(source_name, template, start_zoom)
sources.append(source)
return sources


def make_db_data_fetcher(postgresql_conn_info, template_path, reload_templates,
query_cfg, io_pool):
"""
Returns an object which is callable with the zoom and unpadded bounds and
which returns a list of rows.
"""

sources = parse_source_data(query_cfg)
queries_generator = make_queries_generator(
sources, template_path, reload_templates)
return DataFetcher(
postgresql_conn_info, queries_generator, io_pool)