Skip to content

Commit

Permalink
Fix advanced provider options for dags that override ingest records (#…
Browse files Browse the repository at this point in the history
…4214)

* Add support for fixed_query_params to ProviderDataIngester

* Update ScienceMuseum to use new base class features

* Update other dags that override ingest_records to use new feature

* Update TimeDelineatedProviderDataIngester to work with new features

* Fix tests

* Wait to raise skipped ingestion errors until all rounds are done

* Make TimeDelineatedProviderDataIngester more flexible to allow additional fixed query params

* Fix issue with Finnish museums initial_query_params

* Add more test coverage

* Fix error when initial_query_params not provided for Finnish

* Base Science Museum page number off previous params

* Make logs more readable

* Fix typo

Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>

---------

Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>
  • Loading branch information
stacimc and krysal authored May 1, 2024
1 parent f72edc0 commit 7f4fb7c
Show file tree
Hide file tree
Showing 35 changed files with 428 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, *args, **kwargs):
}
}

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
# Return default query params on the first request
# primaryRepresentation contain a image url for each data
# "+" is a query string syntax for must be present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):
self.api_key = Variable.get("API_KEY_BROOKLYN_MUSEUM")
self.headers = {"api_key": self.api_key}

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
return {
"has_images": 1,
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/cc_mixter.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(self, *args, **kwargs):
delay=self.delay, headers=self.headers
)

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
# This means this is the first request, so we start with offset 0.
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ClevelandDataIngester(ProviderDataIngester):
batch_limit = 1000
delay = 5

def get_next_query_params(self, prev_query_params, **kwargs):
def get_next_query_params(self, prev_query_params: dict | None):
if not prev_query_params:
# Return default query params on the first request
return {"cc": "1", "has_image": "1", "limit": self.batch_limit, "skip": 0}
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/europeana.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def _get_timestamp_query_param(self, date):

return f"timestamp_update:[{start_timestamp} TO {end_timestamp}]"

def get_next_query_params(self, prev_query_params) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
return self.base_request_body

Expand Down
87 changes: 73 additions & 14 deletions catalog/dags/providers/provider_api_scripts/finnish_museums.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
"""

import logging
from datetime import datetime
from itertools import chain

from airflow.exceptions import AirflowSkipException

from common.licenses import LicenseInfo, get_license_info
from common.loader import provider_details as prov
from providers.provider_api_scripts.time_delineated_provider_data_ingester import (
Expand Down Expand Up @@ -54,23 +57,79 @@ class FinnishMuseumsDataIngester(TimeDelineatedProviderDataIngester):
min_divisions = 12
max_divisions = 20

def ingest_records(self, **kwargs):
for building in self.buildings:
logger.info(f"Obtaining images of building {building}")
super().ingest_records(building=building)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# The Finnish Museums API expected double quotes in the params (for
# example, 'format:"0/Image"'). Since query params supplied through the
# DagRun conf advanced options must be supplied as valid json with outer
# double quotes, these inner quotes must be escaped in the form
# (i.e., "format:\"0/Image\""). For convenience and to prevent the DAG
# from breaking if the user inputs single quotes, they are replaced here.
self.initial_query_params = self._replace_single_quotes_in_params(
self.initial_query_params
)

def get_next_query_params(self, prev_query_params, **kwargs):
if not prev_query_params:
building = kwargs.get("building")
start_ts = self.format_ts(kwargs.get("start_ts"))
end_ts = self.format_ts(kwargs.get("end_ts"))
if self.override_query_params_list:
self.override_query_params = (
self._replace_single_quotes_in_params(qp)
for qp in self.override_query_params_list
)

def _replace_single_quotes_in_params(self, params: dict | None):
"""Ensure that the values in the "filter[]" param do not contain single quotes."""
if not params:
return None

return {
**params,
"filter[]": [
param.replace("'", '"') for param in params.get("filter[]", [])
],
}

def get_fixed_query_params(self):
"""
Get the set of fixed params. Ingestion will be run separately for each of
these.
"""
fixed_query_params = []

# Build out timestamp pairs for each building.
for building in self.buildings:
logger.info(f"Generating timestamp pairs for {building}.")
timestamp_pairs = super()._get_timestamp_pairs(building=building)
fixed_query_params += [
self.get_timestamp_query_params(start, end, building=building)
for start, end in timestamp_pairs
]

# If no fixed_query_params were generated (which happens when there are no
# records for this ingestion date for any of the buildings), then skip
if not fixed_query_params:
raise AirflowSkipException("No data to ingest.")

return fixed_query_params

def get_timestamp_query_params(
self, start: datetime, end: datetime, **kwargs
) -> dict:
"""Format the timestamp params appropriately."""
building = kwargs.get("building")
return {
"filter[]": [
f'format:"{self.format_type}"',
f'building:"{building}"',
f'last_indexed:"[{self.format_ts(start)} TO {self.format_ts(end)}]"',
]
}

def get_next_query_params(self, prev_query_params: dict | None):
"""
Get the next query params, based on the previous ones. Fixed query params
will be merged in by the base class as appropriate.
"""
if not prev_query_params:
return {
"filter[]": [
f'format:"{self.format_type}"',
f'building:"{building}"',
f'last_indexed:"[{start_ts} TO {end_ts}]"',
],
"field[]": [
"authors",
"buildings",
Expand Down
42 changes: 28 additions & 14 deletions catalog/dags/providers/provider_api_scripts/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def __init__(self, *args, **kwargs):
# during different stages of the ingestion process.
self.process_large_batch = False

# Keep track of the current timestamp pair being processed.
self.current_timestamp_pair = ()

def ingest_records(self, **kwargs):
"""
Ingest records, handling large batches.
Expand All @@ -105,7 +108,7 @@ def ingest_records(self, **kwargs):
all the unique records over this time period. We will process up to the
max_unique_records count and then move on to the next batch.
"""
# Perform ingestion as normal, splitting requests into time-slices of at most
# Perform full ingestion as normal, splitting requests into time-slices of at most
# 5 minutes. When a batch is encountered which contains more than
# max_unique_records, it is skipped and added to the `large_batches` list for
# later processing.
Expand All @@ -121,8 +124,13 @@ def ingest_records(self, **kwargs):
# For each large batch, ingest records for that interval one license
# type at a time.
for license_ in LICENSE_INFO.keys():
super().ingest_records_for_timestamp_pair(
start_ts=start_ts, end_ts=end_ts, license=license_
super()._ingest_records(
initial_query_params=None,
fixed_query_params={
"min_upload_date": start_ts,
"max_upload_date": end_ts,
"license": license_,
},
)
logger.info("Completed large batch processing by license type.")

Expand All @@ -131,22 +139,28 @@ def ingest_records(self, **kwargs):
# additional requests when generating timestamp pairs.
logger.info(f"Made {self.requests_count + 25} requests to the Flickr API.")

def get_next_query_params(self, prev_query_params, **kwargs):
if not prev_query_params:
# Initial request, return default params
start_timestamp = kwargs.get("start_ts")
end_timestamp = kwargs.get("end_ts")
def _ingest_records(
self, initial_query_params: dict | None, fixed_query_params: dict | None
) -> None:
# Update `current_timestamp_pair` to keep track of what we are processing.
self.current_timestamp_pair = (
fixed_query_params["min_upload_date"],
fixed_query_params["max_upload_date"],
)
return super()._ingest_records(initial_query_params, fixed_query_params)

# license will be available in the params if we're dealing
# with a large batch. If not, fall back to all licenses
license_ = kwargs.get("license", self.default_license_param)
def get_timestamp_query_params(
self, start: datetime, end: datetime, **kwargs
) -> dict:
return {"min_upload_date": start, "max_upload_date": end}

def get_next_query_params(self, prev_query_params: dict | None):
if not prev_query_params:
# Initial request, return default params
return {
"min_upload_date": start_timestamp,
"max_upload_date": end_timestamp,
"page": 0,
"api_key": self.api_key,
"license": license_,
"license": self.default_license_param,
"per_page": self.batch_limit,
"method": "flickr.photos.search",
"media": "photos",
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/freesound.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, *args, **kwargs):

super().__init__(*args, **kwargs)

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
start_date = "*"
# Allow self.date to be undefined, necessary for the first full, successful
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
class INaturalistDataIngester(ProviderDataIngester):
providers = {"image": provider_details.INATURALIST_DEFAULT_PROVIDER}

def get_next_query_params(self, prev_query_params=None, **kwargs):
def get_next_query_params(self, prev_query_params=None):
raise NotImplementedError(
"Instead we use get_batches to dynamically create subtasks."
)
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/jamendo.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class JamendoDataIngester(ProviderDataIngester):
batch_limit = 200
headers = {"Accept": "application/json"}

def get_next_query_params(self, prev_query_params, **kwargs):
def get_next_query_params(self, prev_query_params):
if not prev_query_params:
# On first request, build default params.
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class JusttakeitfreeDataIngester(ProviderDataIngester):
creator = "Justtakeitfree Free Photos"
creator_url = "https://justtakeitfree.com"

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
return {"page": 1, "key": Variable.get("API_KEY_JUSTTAKEITFREE")}
else:
Expand Down
10 changes: 4 additions & 6 deletions catalog/dags/providers/provider_api_scripts/museum_victoria.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,17 @@ def __init__(self, *args, **kwargs):
# This set is used to prevent duplicate images of the same items
self.RECORDS_IDS = set()

def ingest_records(self, **kwargs):
for license_ in self.LICENSE_LIST:
super().ingest_records(license_=license_)

def get_batch_data(self, response_json):
return response_json or None

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_fixed_query_params(self):
return [{"imagelicense": license_} for license_ in self.LICENSE_LIST]

def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
return {
"hasimages": "yes",
"perpage": self.batch_limit,
"imagelicense": kwargs["license_"],
"page": 0,
}
else:
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/nappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class NappyDataIngester(ProviderDataIngester):
"https://creativecommons.org/publicdomain/zero/1.0/"
)

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if not prev_query_params:
return {
"page": 1,
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/nypl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, *args, **kwargs):
NyplDataIngester.headers = {"Authorization": f"Token token={NYPL_API}"}
super().__init__(*args, **kwargs)

def get_next_query_params(self, prev_query_params, **kwargs):
def get_next_query_params(self, prev_query_params):
if not prev_query_params:
return {
"q": "CC_0",
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_api_scripts/phylopic.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _get_initial_query_params(self) -> None:
f"Total pages: {self.total_pages}."
)

def get_next_query_params(self, prev_query_params: dict | None, **kwargs) -> dict:
def get_next_query_params(self, prev_query_params: dict | None) -> dict:
if prev_query_params is not None:
self.current_page += 1

Expand Down
Loading

0 comments on commit 7f4fb7c

Please sign in to comment.