Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add iNaturalist.org metadata #549

Merged
merged 98 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
0dc32e3
templated changes from existing script/readme
rwidom May 10, 2022
e44b2c8
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom May 16, 2022
23419ba
first stab at transformation using pandas
rwidom May 22, 2022
2c98dd1
Merge branch 'main' for thumbnails etc into issue/439-add-provider-iN…
rwidom May 26, 2022
0c2f67c
add capability to load .gz files from s3
rwidom May 28, 2022
7f357c0
load data from S3 and store transformation logic
rwidom May 30, 2022
3cd9db3
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 3, 2022
c7ebeba
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 12, 2022
faf6716
clean up old file structure and files
rwidom Jun 12, 2022
e78f9a3
add test source files
rwidom Jun 13, 2022
1a29f00
cleanup file structure
rwidom Jun 15, 2022
b145238
get sample data into minio container
rwidom Jun 21, 2022
fc2dc54
add sql transformation steps
rwidom Jun 21, 2022
d4b2a56
hide minio testing side effects from git
rwidom Jun 21, 2022
15d2d65
add python wrappers for airflow and testing
rwidom Jun 21, 2022
59ece62
fyi on the source of the sample data
rwidom Jun 21, 2022
0c1f65c
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 21, 2022
0c0fd9e
Minor tweaks to get this working locally
AetherUnbound Jun 22, 2022
067a16c
Update .gitignore
rwidom Jun 23, 2022
1053d3b
Linting
rwidom Jul 3, 2022
b1cc628
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 8, 2022
697b72e
add separate s3 load step to dev docker setup
rwidom Jul 8, 2022
89ce375
airflow loads s3 & calls data ingestor to save tsv
rwidom Jul 9, 2022
ab9f549
make bucket loading piece readable
rwidom Jul 9, 2022
3cab54c
remove unnecessary gitignore change
rwidom Jul 9, 2022
900de4f
put back basic minio volume
rwidom Jul 9, 2022
f72c251
fix count typo in testing comment
rwidom Jul 9, 2022
b54c4c5
linting sql files from sql fluff
rwidom Jul 10, 2022
59cf0b8
fixing linting capitalization error
rwidom Jul 12, 2022
4449811
strengthen file references
rwidom Jul 13, 2022
76111cc
move pg hook to init to reduce overhead
rwidom Jul 13, 2022
e4b61c4
Aggregate dependencies into reusable just recipe, add load_to_s3
AetherUnbound Jul 14, 2022
7fd1102
Add check file exists on S3 to DAG
rwidom Jul 14, 2022
bd1bd5f
Merge branch 'issue/439-add-provider-iNaturalist' of https://github.c…
rwidom Jul 14, 2022
2509db2
fix ingester class initialization override
rwidom Jul 15, 2022
ea335ba
stop overwriting pg temp files
rwidom Jul 15, 2022
10cc426
fix typo in order of callables
rwidom Jul 15, 2022
dcba976
dag runs with python pagination, though tests fail
rwidom Jul 15, 2022
115c7ae
sql for python pagination
rwidom Jul 15, 2022
93d576f
create and manage larger test data files
rwidom Jul 17, 2022
064e223
performant SQL to export JSON
rwidom Jul 17, 2022
464ffe8
landing url not langing url
rwidom Jul 17, 2022
b97b8c7
temp file name with timestamp for postgres
rwidom Jul 17, 2022
afb9cb3
inaturalist provider ingester + testing
rwidom Jul 17, 2022
0b38630
removed likely to error command
rwidom Jul 17, 2022
ca7564b
committing workflow for review, not complete
rwidom Jul 17, 2022
9581c12
use batch_limit
rwidom Jul 19, 2022
cff5c5d
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 20, 2022
2b5c53e
complete the workflow cycle
rwidom Jul 21, 2022
bfe7b31
make preingestion factory logic more concise
rwidom Jul 21, 2022
b15d989
check s3 file update date to run dag daily
rwidom Jul 21, 2022
bf452b4
testing with resource files instead of minio
rwidom Jul 23, 2022
7b0854a
more concise batch data testing for none
rwidom Jul 23, 2022
fe3d0c2
monthly run, checking if files have been updated
rwidom Jul 23, 2022
5b0356c
linting
rwidom Jul 23, 2022
52f3b13
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 23, 2022
f515bda
pass tags as a list
rwidom Jul 24, 2022
20b6218
list reformat linting
rwidom Jul 24, 2022
e2c1c9b
more readable bash command
rwidom Jul 27, 2022
1a1d940
inaturalist --> iNaturalist in class name
rwidom Jul 27, 2022
b2e1177
old_query_params --> prev_query_params
rwidom Jul 27, 2022
42d58e9
improved description for sample data file manager
rwidom Jul 27, 2022
95ac706
parametrize tests for missing data
rwidom Jul 27, 2022
e73c5ea
more concise / non-redundant get usage
rwidom Jul 27, 2022
2ac5dac
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 27, 2022
216614e
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 28, 2022
337c1d7
streamline checking for new data
rwidom Jul 28, 2022
212f91d
remove unnecessary logging and imports
rwidom Jul 28, 2022
4894f72
Merge 'main' -> issue/439-add-provider-iNaturalist
rwidom Jul 28, 2022
43fe04f
iNaturalistDataIngester -> INaturalistDataIngester
rwidom Jul 28, 2022
87515e0
don't need to make foreign ID a string up front
rwidom Jul 28, 2022
a263f31
clarify comment on observer FK
rwidom Jul 28, 2022
dd690d7
more readable use of sql template
rwidom Jul 28, 2022
788f69e
streamline transformations in postgres
rwidom Jul 29, 2022
989ce98
add test case with no ancestry info / tags
rwidom Jul 29, 2022
5f25ca9
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 29, 2022
3bb7f65
linting test db response
rwidom Jul 29, 2022
772eb0b
remove env check because file won't be in prod
rwidom Jul 30, 2022
1db92b0
Add SQL tests
rwidom Aug 1, 2022
a32f1d6
remove universal tags
rwidom Aug 1, 2022
6c78765
adjusted db response without "life" tag
rwidom Aug 1, 2022
1bbcb6e
lint db response
rwidom Aug 1, 2022
b71fbdf
remove life from hard coded test results
rwidom Aug 1, 2022
4e20e32
drop source tables and schema after ingestion
rwidom Aug 1, 2022
e21fecb
nicer feedback on initial load test with pytest
rwidom Aug 2, 2022
0100a7e
try adding default image category
rwidom Aug 2, 2022
9eed1f9
Print logs on failure
AetherUnbound Aug 3, 2022
dce23fb
Revert "Print logs on failure"
AetherUnbound Aug 3, 2022
2a4074e
post-ingestion task comment
rwidom Aug 3, 2022
a6ff6df
clarify and simplify test
rwidom Aug 4, 2022
ce2df38
simplify sql names and remove dedent
rwidom Aug 9, 2022
dc6887c
clarify duplicate photo id status
rwidom Aug 12, 2022
ec53e94
lint fix
rwidom Aug 12, 2022
c039754
documentation for pre- and post-ingestion task params
rwidom Aug 12, 2022
f21af29
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Aug 13, 2022
6282c17
Update DAGs.md
AetherUnbound Aug 16, 2022
27506d8
add issue references to comments
rwidom Aug 19, 2022
44418e9
fix comment typo
rwidom Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following are DAGs grouped by their primary tag:
| `finnish_museums_workflow` | `@monthly` | `False` | image |
| [`flickr_workflow`](#flickr_workflow) | `@daily` | `True` | image |
| [`freesound_workflow`](#freesound_workflow) | `@monthly` | `False` | audio |
| [`inaturalist_workflow`](#inaturalist_workflow) | `@monthly` | `False` | image |
| [`jamendo_workflow`](#jamendo_workflow) | `@monthly` | `False` | audio |
| [`metropolitan_museum_workflow`](#metropolitan_museum_workflow) | `@daily` | `True` | image |
| `museum_victoria_workflow` | `@monthly` | `False` | image |
Expand Down Expand Up @@ -117,6 +118,7 @@ The following is documentation associated with each DAG (where available):
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
1. [`inaturalist_workflow`](#inaturalist_workflow)
1. [`jamendo_workflow`](#jamendo_workflow)
1. [`metropolitan_museum_workflow`](#metropolitan_museum_workflow)
1. [`oauth2_authorization`](#oauth2_authorization)
Expand Down Expand Up @@ -292,6 +294,30 @@ https://github.com/WordPress/openverse-catalog/issues/353)
https://github.com/WordPress/openverse-catalog/issues/453)


## `inaturalist_workflow`


Provider: iNaturalist

Output: TSV file containing the media metadata.

Notes: [The iNaturalist API is not intended for data scraping.]
(https://api.inaturalist.org/v1/docs/)

[But there is a full dump intended for sharing on S3.]
(https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata)

Because these are very large normalized tables, as opposed to more document
oriented API responses, we found that bringing the data into postgres first
was the most effective approach. [More detail in slack here.]
(https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N)

We use the table structure defined [here,]
(https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql)
except for adding ancestry tags to the taxa table.



## `jamendo_workflow`


Expand Down
32 changes: 31 additions & 1 deletion docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
MINIO_ROOT_PASSWORD: ${AWS_SECRET_KEY}
# Comma separated list of buckets to create on startup
BUCKETS_TO_CREATE: ${OPENVERSE_BUCKET},openverse-airflow-logs,commonsmapper-v2,commonsmapper
# Create the buckets on every container startup
# Create empty buckets on every container startup
# Note: $0 is included in the exec because "/bin/bash -c" swallows the first
# argument, so it must be re-added at the beginning of the exec call
entrypoint: >-
Expand All @@ -46,6 +46,36 @@ services:
timeout: 20s
retries: 3

load_to_s3:
image: minio/mc:latest
env_file:
- .env
depends_on:
- s3
volumes:
# Buckets for testing provider data imported from s3 are subdirectories under
# /tests/s3-data/
- ./tests/s3-data:/data:rw
# Loop through subdirectories mounted to the volume and load them to s3/minio.
# This takes care of filesystem delays on some local dev environments that may make
# minio miss files included directly in the minio volume.
# More info here: https://stackoverflow.com/questions/72867045
# This does *not* allow for testing permissions issues that may come up in real AWS.
# And, if you remove files from /tests/s3-data, you will need to use `just down -v`
# and `just up` or `just recreate` to see the minio bucket without those files.
entrypoint: >
/bin/sh -c "
/usr/bin/mc config host add s3 http://s3:5000 ${AWS_ACCESS_KEY} ${AWS_SECRET_KEY};
cd /data;
for b in */ ; do
sarayourfriend marked this conversation as resolved.
Show resolved Hide resolved
echo \"Loading bucket $$b\"
/usr/bin/mc mb --ignore-existing s3/$$b
/usr/bin/mc cp --r $$b s3/$$b
/usr/bin/mc ls s3/$$b;
done ;
exit 0;
"

# Dev changes for the webserver container
webserver:
depends_on:
Expand Down
9 changes: 7 additions & 2 deletions docker/local_postgres/0002_aws_s3_mock.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@ LANGUAGE plpython3u
AS $$
import os
import boto3
from datetime import datetime as dt
s3_obj = boto3.resource(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY', 'test_key'),
aws_secret_access_key=os.getenv('AWS_SECRET_KEY', 'test_secret'),
region_name=region,
endpoint_url=os.getenv('S3_LOCAL_ENDPOINT', 'http://s3:5000')
).Object(bucket, file_path)
temp_location = '/tmp/postgres_loading.tsv'
temp_location = f"/tmp/pg_load_{dt.now().timestamp()}_{file_path.split('/')[-1]}"
s3_obj.download_file(temp_location)
if file_path[-3:]=='.gz':
copy_from = f"PROGRAM 'gzip -dc {temp_location}'"
else:
copy_from = plpy.quote_literal(temp_location)
with open(temp_location) as f:
columns = '({})'.format(column_list) if column_list else ''
res = plpy.execute(
'COPY {} {} FROM {} {};'.format(
table_name,
columns,
plpy.quote_literal(temp_location),
copy_from,
options
)
)
Expand Down
10 changes: 7 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ deploy:
lint:
pre-commit run --all-files

# Load any dependencies necessary for actions on the stack without running the webserver
_deps:
@just up "postgres s3 load_to_s3"

# Mount the tests directory and run a particular command
@_mount-tests command: (up "postgres s3")
@_mount-tests command: _deps
# The test directory is mounted into the container only during testing
docker-compose {{ DOCKER_FILES }} run \
-v {{ justfile_directory() }}/tests:/usr/local/airflow/tests/ \
Expand All @@ -75,7 +79,7 @@ shell: up
docker-compose {{ DOCKER_FILES }} exec {{ SERVICE }} /bin/bash

# Launch an IPython REPL using the webserver image
ipython: (up "postgres s3")
ipython: _deps
docker-compose {{ DOCKER_FILES }} run \
--rm \
-w /usr/local/airflow/openverse_catalog/dags \
Expand All @@ -84,7 +88,7 @@ ipython: (up "postgres s3")
/usr/local/airflow/.local/bin/ipython

# Run a given command using the webserver image
run *args: (up "postgres s3")
run *args: _deps
docker-compose {{ DOCKER_FILES }} run --rm {{ SERVICE }} {{ args }}

# Launch a pgcli shell on the postgres container (defaults to openledger) use "airflow" for airflow metastore
Expand Down
2 changes: 2 additions & 0 deletions openverse_catalog/dags/common/loader/provider_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
STOCKSNAP_DEFAULT_PROVIDER = "stocksnap"
WORDPRESS_DEFAULT_PROVIDER = "wordpress"
FREESOUND_DEFAULT_PROVIDER = "freesound"
INATURALIST_DEFAULT_PROVIDER = "inaturalist"

# Finnish parameters
FINNISH_SUB_PROVIDERS = {
Expand Down Expand Up @@ -130,6 +131,7 @@ class ImageCategory(Enum):
"deviantart": ImageCategory.DIGITIZED_ARTWORK.value,
"digitaltmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"floraon": ImageCategory.PHOTOGRAPH.value,
"inaturalist": ImageCategory.PHOTOGRAPH.value,
"mccordmuseum": ImageCategory.DIGITIZED_ARTWORK.value,
"met": ImageCategory.DIGITIZED_ARTWORK.value,
"museumsvictoria": ImageCategory.DIGITIZED_ARTWORK.value,
Expand Down
166 changes: 166 additions & 0 deletions openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
Provider: iNaturalist

Output: TSV file containing the media metadata.

Notes: [The iNaturalist API is not intended for data scraping.]
(https://api.inaturalist.org/v1/docs/)

[But there is a full dump intended for sharing on S3.]
(https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata)

Because these are very large normalized tables, as opposed to more document
oriented API responses, we found that bringing the data into postgres first
was the most effective approach. [More detail in slack here.]
(https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N)

We use the table structure defined [here,]
(https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql)
except for adding ancestry tags to the taxa table.
"""

import os
from pathlib import Path
from typing import Dict

import pendulum
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from common.constants import POSTGRES_CONN_ID
from common.licenses import LicenseInfo, get_license_info
from common.loader import provider_details as prov
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


AWS_CONN_ID = os.getenv("AWS_CONN_ID", "test_conn_id")
PROVIDER = prov.INATURALIST_DEFAULT_PROVIDER
SCRIPT_DIR = Path(__file__).parents[1] / "provider_csv_load_scripts/inaturalist"
SOURCE_FILE_NAMES = ["photos", "observations", "taxa", "observers"]


class INaturalistDataIngester(ProviderDataIngester):

providers = {"image": prov.INATURALIST_DEFAULT_PROVIDER}

def __init__(self, *kwargs):
super(INaturalistDataIngester, self).__init__()
self.pg = PostgresHook(POSTGRES_CONN_ID)

# adjustments to buffer limits. TO DO: try to integrate this with the dev
# environment logic in the base class, rather than just over-writing it.
# See https://github.com/WordPress/openverse-catalog/issues/682
self.media_stores["image"].buffer_length = 10_000
self.batch_limit = 10_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: how was 10k decided on?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gut instinct from @AetherUnbound, if memory serves. I haven't been able to run the whole thing on my local machine, because of thermal throttling, but I did get through several hundred thousand (maybe a couple of million?) records locally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Is there a plan to monitor the resource consumption of this DAG when we first run it in production @AetherUnbound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, our normal limit is 100 but since there's so much data here it seemed prudent to increase the batch volume on both ends. We will indeed be monitoring performance, and we'll be sure to kick this off for the first time when nothing else intensive is running on Airflow so we have the smallest likelyhood of taking something down (CC @stacimc)

self.sql_template = (SCRIPT_DIR / "export_to_json.template.sql").read_text()

def get_next_query_params(self, prev_query_params=None, **kwargs):
if prev_query_params is None:
return {"offset_num": 0}
else:
next_offset = prev_query_params["offset_num"] + self.batch_limit
return {"offset_num": next_offset}

def get_response_json(self, query_params: Dict):
"""
Call the SQL to pull json from Postgres, where the raw data has been loaded.
"""
sql_string = self.sql_template.format(
batch_limit=self.batch_limit, offset_num=query_params["offset_num"]
)
sql_result = self.pg.get_records(sql_string)
# Postgres returns a list of tuples, even if it's one tuple with one item.
return sql_result[0][0]
rwidom marked this conversation as resolved.
Show resolved Hide resolved

def get_batch_data(self, response_json):
if response_json:
return response_json
return None

def get_record_data(self, data):
if data.get("foreign_identifier") is None:
# TO DO: maybe raise an error here or after a certain number of these?
# more info in https://github.com/WordPress/openverse-catalog/issues/684
return None
license_url = data.get("license_url")
license_info = get_license_info(license_url=license_url)
if license_info == LicenseInfo(None, None, None, None):
return None
sarayourfriend marked this conversation as resolved.
Show resolved Hide resolved
record_data = {k: data[k] for k in data.keys() if k != "license_url"}
record_data["license_info"] = license_info
return record_data

def get_media_type(self, record):
# This provider only supports Images via S3, though they have some audio files
# on site and in the API.
return "image"

def endpoint(self):
raise NotImplementedError("Normalized TSV files from AWS S3 means no endpoint.")

@staticmethod
def compare_update_dates(
last_success: pendulum.DateTime | None, s3_keys: list, aws_conn_id=AWS_CONN_ID
):
# if it was never run, assume the data is new
if last_success is None:
return
s3 = S3Hook(aws_conn_id=aws_conn_id)
s3_client = s3.get_client_type()
for key in s3_keys:
# this will error out if the files don't exist, and bubble up as an
# informative failure
last_modified = s3_client.head_object(
Bucket="inaturalist-open-data", Key=key
)["LastModified"]
# if any file has been updated, let's pull them all
if last_success < last_modified:
return
# If no files have been updated, skip the DAG
raise AirflowSkipException("Nothing new to ingest")

@staticmethod
def create_preingestion_tasks():

with TaskGroup(group_id="preingestion_tasks") as preingestion_tasks:

check_for_file_updates = PythonOperator(
task_id="check_for_file_updates",
python_callable=INaturalistDataIngester.compare_update_dates,
op_kwargs={
"last_success": "{{ prev_start_date_success }}",
"s3_keys": [
f"{file_name}.csv.gz" for file_name in SOURCE_FILE_NAMES
],
},
)

create_inaturalist_schema = PostgresOperator(
task_id="create_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / "create_schema.sql").read_text(),
)

with TaskGroup(group_id="load_source_files") as load_source_files:
for source_name in SOURCE_FILE_NAMES:
PostgresOperator(
task_id=f"load_{source_name}",
postgres_conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / f"{source_name}.sql").read_text(),
),

(check_for_file_updates >> create_inaturalist_schema >> load_source_files)

return preingestion_tasks

@staticmethod
def create_postingestion_tasks():
drop_inaturalist_schema = PostgresOperator(
task_id="drop_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
sql="DROP SCHEMA IF EXISTS inaturalist CASCADE",
)
return drop_inaturalist_schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE SCHEMA IF NOT EXISTS inaturalist;
sarayourfriend marked this conversation as resolved.
Show resolved Hide resolved
COMMIT;
SELECT schema_name
FROM information_schema.schemata WHERE schema_name = 'inaturalist';
Loading