Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add iNaturalist.org metadata #549

Merged
merged 98 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
0dc32e3
templated changes from existing script/readme
rwidom May 10, 2022
e44b2c8
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom May 16, 2022
23419ba
first stab at transformation using pandas
rwidom May 22, 2022
2c98dd1
Merge branch 'main' for thumbnails etc into issue/439-add-provider-iN…
rwidom May 26, 2022
0c2f67c
add capability to load .gz files from s3
rwidom May 28, 2022
7f357c0
load data from S3 and store transformation logic
rwidom May 30, 2022
3cd9db3
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 3, 2022
c7ebeba
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 12, 2022
faf6716
clean up old file structure and files
rwidom Jun 12, 2022
e78f9a3
add test source files
rwidom Jun 13, 2022
1a29f00
cleanup file structure
rwidom Jun 15, 2022
b145238
get sample data into minio container
rwidom Jun 21, 2022
fc2dc54
add sql transformation steps
rwidom Jun 21, 2022
d4b2a56
hide minio testing side effects from git
rwidom Jun 21, 2022
15d2d65
add python wrappers for airflow and testing
rwidom Jun 21, 2022
59ece62
fyi on the source of the sample data
rwidom Jun 21, 2022
0c1f65c
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jun 21, 2022
0c0fd9e
Minor tweaks to get this working locally
AetherUnbound Jun 22, 2022
067a16c
Update .gitignore
rwidom Jun 23, 2022
1053d3b
Linting
rwidom Jul 3, 2022
b1cc628
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 8, 2022
697b72e
add separate s3 load step to dev docker setup
rwidom Jul 8, 2022
89ce375
airflow loads s3 & calls data ingestor to save tsv
rwidom Jul 9, 2022
ab9f549
make bucket loading piece readable
rwidom Jul 9, 2022
3cab54c
remove unnecessary gitignore change
rwidom Jul 9, 2022
900de4f
put back basic minio volume
rwidom Jul 9, 2022
f72c251
fix count typo in testing comment
rwidom Jul 9, 2022
b54c4c5
linting sql files from sql fluff
rwidom Jul 10, 2022
59cf0b8
fixing linting capitalization error
rwidom Jul 12, 2022
4449811
strengthen file references
rwidom Jul 13, 2022
76111cc
move pg hook to init to reduce overhead
rwidom Jul 13, 2022
e4b61c4
Aggregate dependencies into reusable just recipe, add load_to_s3
AetherUnbound Jul 14, 2022
7fd1102
Add check file exists on S3 to DAG
rwidom Jul 14, 2022
bd1bd5f
Merge branch 'issue/439-add-provider-iNaturalist' of https://github.c…
rwidom Jul 14, 2022
2509db2
fix ingester class initialization override
rwidom Jul 15, 2022
ea335ba
stop overwriting pg temp files
rwidom Jul 15, 2022
10cc426
fix typo in order of callables
rwidom Jul 15, 2022
dcba976
dag runs with python pagination, though tests fail
rwidom Jul 15, 2022
115c7ae
sql for python pagination
rwidom Jul 15, 2022
93d576f
create and manage larger test data files
rwidom Jul 17, 2022
064e223
performant SQL to export JSON
rwidom Jul 17, 2022
464ffe8
landing url not langing url
rwidom Jul 17, 2022
b97b8c7
temp file name with timestamp for postgres
rwidom Jul 17, 2022
afb9cb3
inaturalist provider ingester + testing
rwidom Jul 17, 2022
0b38630
removed likely to error command
rwidom Jul 17, 2022
ca7564b
committing workflow for review, not complete
rwidom Jul 17, 2022
9581c12
use batch_limit
rwidom Jul 19, 2022
cff5c5d
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 20, 2022
2b5c53e
complete the workflow cycle
rwidom Jul 21, 2022
bfe7b31
make preingestion factory logic more concise
rwidom Jul 21, 2022
b15d989
check s3 file update date to run dag daily
rwidom Jul 21, 2022
bf452b4
testing with resource files instead of minio
rwidom Jul 23, 2022
7b0854a
more concise batch data testing for none
rwidom Jul 23, 2022
fe3d0c2
monthly run, checking if files have been updated
rwidom Jul 23, 2022
5b0356c
linting
rwidom Jul 23, 2022
52f3b13
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 23, 2022
f515bda
pass tags as a list
rwidom Jul 24, 2022
20b6218
list reformat linting
rwidom Jul 24, 2022
e2c1c9b
more readable bash command
rwidom Jul 27, 2022
1a1d940
inaturalist --> iNaturalist in class name
rwidom Jul 27, 2022
b2e1177
old_query_params --> prev_query_params
rwidom Jul 27, 2022
42d58e9
improved description for sample data file manager
rwidom Jul 27, 2022
95ac706
parametrize tests for missing data
rwidom Jul 27, 2022
e73c5ea
more concise / non-redundant get usage
rwidom Jul 27, 2022
2ac5dac
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 27, 2022
216614e
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 28, 2022
337c1d7
streamline checking for new data
rwidom Jul 28, 2022
212f91d
remove unnecessary logging and imports
rwidom Jul 28, 2022
4894f72
Merge 'main' -> issue/439-add-provider-iNaturalist
rwidom Jul 28, 2022
43fe04f
iNaturalistDataIngester -> INaturalistDataIngester
rwidom Jul 28, 2022
87515e0
don't need to make foreign ID a string up front
rwidom Jul 28, 2022
a263f31
clarify comment on observer FK
rwidom Jul 28, 2022
dd690d7
more readable use of sql template
rwidom Jul 28, 2022
788f69e
streamline transformations in postgres
rwidom Jul 29, 2022
989ce98
add test case with no ancestry info / tags
rwidom Jul 29, 2022
5f25ca9
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Jul 29, 2022
3bb7f65
linting test db response
rwidom Jul 29, 2022
772eb0b
remove env check because file won't be in prod
rwidom Jul 30, 2022
1db92b0
Add SQL tests
rwidom Aug 1, 2022
a32f1d6
remove universal tags
rwidom Aug 1, 2022
6c78765
adjusted db response without "life" tag
rwidom Aug 1, 2022
1bbcb6e
lint db response
rwidom Aug 1, 2022
b71fbdf
remove life from hard coded test results
rwidom Aug 1, 2022
4e20e32
drop source tables and schema after ingestion
rwidom Aug 1, 2022
e21fecb
nicer feedback on initial load test with pytest
rwidom Aug 2, 2022
0100a7e
try adding default image category
rwidom Aug 2, 2022
9eed1f9
Print logs on failure
AetherUnbound Aug 3, 2022
dce23fb
Revert "Print logs on failure"
AetherUnbound Aug 3, 2022
2a4074e
post-ingestion task comment
rwidom Aug 3, 2022
a6ff6df
clarify and simplify test
rwidom Aug 4, 2022
ce2df38
simplify sql names and remove dedent
rwidom Aug 9, 2022
dc6887c
clarify duplicate photo id status
rwidom Aug 12, 2022
ec53e94
lint fix
rwidom Aug 12, 2022
c039754
documentation for pre- and post-ingestion task params
rwidom Aug 12, 2022
f21af29
Merge branch 'main' into issue/439-add-provider-iNaturalist
rwidom Aug 13, 2022
6282c17
Update DAGs.md
AetherUnbound Aug 16, 2022
27506d8
add issue references to comments
rwidom Aug 19, 2022
44418e9
fix comment typo
rwidom Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,10 @@ si_samples_*

# Vim detritus
*.swp

# minio testing detritus
/tests/s3-data/.minio.sys/
/tests/s3-data/commonsmapper/
/tests/s3-data/commonsmapper-v2/
/tests/s3-data/openverse-airflow-logs/
/tests/s3-data/openverse-storage/
rwidom marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 15 additions & 2 deletions docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ services:
MINIO_ROOT_USER: ${AWS_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${AWS_SECRET_KEY}
# Comma separated list of buckets to create on startup
BUCKETS_TO_CREATE: ${OPENVERSE_BUCKET},openverse-airflow-logs,commonsmapper-v2,commonsmapper
BUCKETS_TO_CREATE: ${OPENVERSE_BUCKET},openverse-airflow-logs,commonsmapper-v2,commonsmapper,inaturalist-open-data
# Create the buckets on every container startup
# Is there something about creating the buckets in the entrypoint that might make them
# visible as buckets, as opposed to syncing them over from the volume? Doesn't seem likely,
# so maybe this could be dropped in favor of fixed empty subdirectories?
rwidom marked this conversation as resolved.
Show resolved Hide resolved
# Note: $0 is included in the exec because "/bin/bash -c" swallows the first
# argument, so it must be re-added at the beginning of the exec call
entrypoint: >-
Expand All @@ -39,7 +42,17 @@ services:
exec $$0 \"$$@\""
command: minio server /data --address :5000 --console-address :5001
volumes:
- minio:/data
# Any buckets used for testing provider data imported from s3 should be subdirectories under
# /tests/s3-data/
# The trick to the issues here (https://wordpress.slack.com/archives/C02012JB00N/p1655118749223179)
# was to have a single volume, and to represent each bucket as a subdirectory under that.
# BUT, the test runs still can't see the inaturalist-open-data bucket, even though I have
# verified through docker that the files exist on the container.
# TO DO: The openverse user has root access to all buckets on this minio instance, which
# makes sense for openverse-owned buckets, but not for read-only buckets owned by providers.
# Maybe we add a minio/mc + minio/minio pair with more realistic permissions for buckets
# containing sample provider data?
- ./tests/s3-data:/data:rw
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5010/minio/health/live"]
interval: 30s
Expand Down
6 changes: 5 additions & 1 deletion docker/local_postgres/0002_aws_s3_mock.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ AS $$
).Object(bucket, file_path)
temp_location = '/tmp/postgres_loading.tsv'
s3_obj.download_file(temp_location)
if file_path[-3:]=='.gz':
copy_from = "PROGRAM 'gzip -dc "+temp_location+"'"
rwidom marked this conversation as resolved.
Show resolved Hide resolved
else:
copy_from = plpy.quote_literal(temp_location)
with open(temp_location) as f:
columns = '({})'.format(column_list) if column_list else ''
res = plpy.execute(
'COPY {} {} FROM {} {};'.format(
table_name,
columns,
plpy.quote_literal(temp_location),
copy_from,
options
)
)
Expand Down
1 change: 1 addition & 0 deletions openverse_catalog/dags/common/loader/provider_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
STOCKSNAP_DEFAULT_PROVIDER = "stocksnap"
WORDPRESS_DEFAULT_PROVIDER = "wordpress"
FREESOUND_DEFAULT_PROVIDER = "freesound"
INATURALIST_DEFAULT_PROVIDER = "inaturalist"

# Finnish parameters
FINNISH_SUB_PROVIDERS = {
Expand Down
26 changes: 26 additions & 0 deletions openverse_catalog/dags/providers/inaturalist_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
This file configures the Apache Airflow DAG to (re)ingest Inaturalist data.
"""
# airflow DAG (necessary for Airflow to find this file)
from datetime import datetime
import logging

from providers.provider_api_scripts import inaturalist
from common.provider_dag_factory import create_provider_api_workflow


logging.basicConfig(
format='%(asctime)s: [%(levelname)s - DAG Loader] %(message)s',
level=logging.DEBUG)
logger = logging.getLogger(__name__)

DAG_ID = "inaturalist_workflow"

globals()[DAG_ID] = create_provider_api_workflow(
DAG_ID,
inaturalist.main,
start_date=datetime(1970, 1, 1),
max_active_tasks=1,
schedule_string='@monthly',
dated=False,
)
111 changes: 111 additions & 0 deletions openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
## There are a bunch of bigger picture notes below, but right now, this is failing
## in the dev environment in that it can't access the sample files in minio.
## All of the other s3 tests run fine in my dev environment though.

"""
Content Provider: Inaturalist

ETL Process: With Inaturalist, for reasons described below, we aren't really doing ETL, but ELT.
Part of the challenge is to figure out how much to try to fit an ELT process into
code developed for ETL. The compromise here is a lot of SQL wrapped in Python.

Another approach to consider would be an open source tool developed specifically for
SQL transformations -- dbt. Reading this article -- https://docs.getdbt.com/blog/dbt-airflow-spiritual-alignment
-- made me wonder if a proof of concept with Inaturalist data might be worthwhile.
dbt core is fully open source, and assuming that there will be more ELT providers,
it could be worthwhile.

Output: This had been "TSV file containing the media metadata." but I'm not 100% sure that
that makes sense given the whole ELT vs ETL thing.

Notes: The inaturalist API is not intended for data scraping.
https://api.inaturalist.org/v1/docs/
But there is a full dump intended for sharing on S3.
https://github.com/inaturalist/inaturalist-open-data/tree/documentation/Metadata
Because these are very large normalized tables, as opposed to more document oriented API
responses, we found that bringing the data into postgres first was the most effective approach.
More detail in slack here:
https://wordpress.slack.com/archives/C02012JB00N/p1653145643080479?thread_ts=1653082292.714469&cid=C02012JB00N
This uses the structure defined here, except for adding ancestry tags to the taxa table:
https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql

TO DO: There is nothing here that actually updates the images table.
Need to figure out a stand-in for metrics from the saved json counter

"""
import json
import os
import logging
from pathlib import Path
from urllib.parse import urlparse

from textwrap import dedent
import psycopg2

from common.loader import provider_details as prov
from common.storage.image import ImageStore


logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)

PROVIDER = prov.INATURALIST_DEFAULT_PROVIDER
SCRIPT_DIR = '/usr/local/airflow/openverse_catalog/dags/providers/provider_csv_load_scripts/inaturalist/'

# This is set up here as a single thread linear process, to be more consisten with the
# structure of the provider dag factory, but if I were to start from scratch with airflow,
# each of these would be a task, and the dag would be 00 >> [01, 02, 03, 04] >> 05.
# In dbt, the schema would be created separately, and steps 1-5 would each be a "model"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! You could definitely manage this using @AetherUnbound's suggestion to take the DAG and build a TaskGroup that can be set as an upstream task dependency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that if I did everything with Airflow hooks the boto issue I'm seeing would go away? I vaguely remember that airflow also has an s3 --> postgres method somewhere.

# and dbt would navigate the dependencies more or less automagically.
LOAD_PROCESS = [
'00_create_schema.sql',
'01_photos.sql',
'02_observations.sql',
'03_taxa.sql',
'04_observers.sql',
'05_final_tsv.sql'
]
CONNECTION_ID = os.getenv("AIRFLOW_CONN_POSTGRES_OPENLEDGER_TESTING")


def run_sql_file(file_name, file_path=SCRIPT_DIR, conn_id=CONNECTION_ID):
"""
The process is really written in SQL so this script just enables logging
and monitoring jobs, but this is the basic function to run the SQL files
for each step.
"""
logger.info(f"Running {file_name} using DB connection {conn_id}")
result = 'SQL failed. See log for details.'
try:
assert file_name[-4:]=='.sql'
assert os.path.exists(file_path + file_name)
db = psycopg2.connect(conn_id)
cursor = db.cursor()
query = dedent(open(file_path + file_name, 'r').read())
cursor.execute(query)
result = cursor.fetchall()
db.commit()
logger.info("Success!")
except Exception as e:
logger.warning(f"SQL step failed due to {e}")
return result


def main():
"""
This is really just looping through the SQL steps defined above, with some additional logging.
"""
logger.info("Begin: Inaturalist script")

for f in LOAD_PROCESS:
image_count = run_sql_file(f)
logger.info(f"Results: {str(image_count)}")
logger.info(f"Total images pulled: {image_count}")
logger.info('Terminated!')


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE SCHEMA IF NOT EXISTS inaturalist;
select schema_name from information_schema.schemata where schema_name='inaturalist';
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
********************************************************************************
PHOTOS
********************************************************************************
-- not using an FK constraint on observer_id to save load time, but the 5/30/2022 dataset does have complete observer ids
-- not using an FK constraint on observation_uuid to save load time, but the 5/30/2022 dataset does have complete observer ids
-- photo_id is not unique. There are ~130,000 photo_ids that appear more than once, maybe because an earlier version of
the photo was deleted (?) unclear, but for now I'm going to assume that they will be taken care of later in the processing
of these data. It does mean that we can't add an index and things will go slower when selecting on photo id.
rwidom marked this conversation as resolved.
Show resolved Hide resolved
The documentation suggests indexing on photo UUID, but the AWS files of the actual photos, are stored under photo_id

Taking DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql
*/

DROP TABLE IF EXISTS inaturalist.photos;
commit;

CREATE TABLE inaturalist.photos (
photo_uuid uuid NOT NULL,
photo_id integer NOT NULL,
observation_uuid uuid NOT NULL,
observer_id integer,
extension character varying(5),
license character varying(255),
rwidom marked this conversation as resolved.
Show resolved Hide resolved
width smallint,
height smallint,
position smallint
);

select aws_s3.table_import_from_s3('inaturalist.photos',
'',
'(FORMAT ''csv'', DELIMITER E''\t'', HEADER, QUOTE E''\b'')',
'inaturalist-open-data',
'photos.csv.gz',
'us-east-1');

CREATE INDEX index_inaturalist_photo_id ON inaturalist.photos (photo_id);

select count(*) from inaturalist.photos;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
********************************************************************************
OBSERVATIONS
********************************************************************************
-- ~400,000 observations do not have a taxon_id that is in the taxa table.
-- Their photos are not included in the final transformed view on the assumption
that photos are not useful to us without a title or tags

Taking DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql
*/

DROP TABLE IF EXISTS inaturalist.observations;
commit;

CREATE TABLE CREATE TABLE inaturalist.observations (
observation_uuid uuid,
observer_id integer,
latitude numeric(15,10),
longitude numeric(15,10),
positional_accuracy integer,
taxon_id integer,
quality_grade character varying(255),
observed_on date
);

select aws_s3.table_import_from_s3('inaturalist.observations',
'',
'(FORMAT ''csv'', DELIMITER E''\t'', HEADER, QUOTE E''\b'')',
'inaturalist-open-data',
'observations.csv.gz',
'us-east-1');

ALTER TABLE inaturalist.observations ADD PRIMARY KEY (observation_uuid);

select count(*) from inaturalist.observations;
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
********************************************************************************
TAXA
********************************************************************************

Taking DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql
Plus adding a field for ancestry tags.
*/

DROP TABLE IF EXISTS inaturalist.taxa;
commit;

CREATE TABLE inaturalist.taxa (
taxon_id integer,
ancestry character varying(255),
rank_level double precision,
rank character varying(255),
name character varying(255),
active boolean,
tags text
);

-- Load from S3
select aws_s3.table_import_from_s3('inaturalist.taxa',
'taxon_id, ancestry, rank_level, rank, name, active',
'(FORMAT ''csv'', DELIMITER E''\t'', HEADER, QUOTE E''\b'')',
'inaturalist-open-data',
'taxa.csv.gz',
'us-east-1');

-- doing this after the load to help performance, but will need a way to
-- handle non-uniqueness if it comes up
ALTER TABLE inaturalist.taxa ADD PRIMARY KEY (taxon_id);

-- Aggregate ancestry names as tags
create temporary table unnest_ancestry as
(
SELECT
unnest(string_to_array(ancestry, '/'))::int as linked_taxon_id,
taxon_id
FROM inaturalist.taxa
);

create temporary table taxa_tags as
(
select u.taxon_id, STRING_AGG(taxa.name, '; ') as tags
from unnest_ancestry as u
join inaturalist.taxa on u.linked_taxon_id = taxa.taxon_id
where taxa.rank not in ('kingdom', 'stateofmatter')
group by u.taxon_id
);

update inaturalist.taxa
set tags = taxa_tags.tags
from inaturalist.taxa_tags
where taxa_tags.taxon_id = taxa.taxon_id;

select count(*) from inaturalist.taxa;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
********************************************************************************
OBSERVERS
********************************************************************************

Taking DDL from https://github.com/inaturalist/inaturalist-open-data/blob/main/Metadata/structure.sql
*/

DROP TABLE IF EXISTS inaturalist.observers;
commit;

CREATE TABLE inaturalist.observers (
observer_id integer,
login character varying(255),
name character varying(255)
);

select aws_s3.table_import_from_s3('inaturalist.observers',
'',
'(FORMAT ''csv'', DELIMITER E''\t'', HEADER, QUOTE E''\b'')',
'inaturalist-open-data',
'observers.csv.gz',
'us-east-1');

-- doing this after the load to speed performance, but will need a way to handle non-uniqueness
rwidom marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE inaturalist.observers ADD PRIMARY KEY (observer_id);

select count(*) from inaturalist.observers;
Loading