Skip to content

Commit

Permalink
Add new 2.0 functionality, closes neuml#17, closes neuml#36, closes n…
Browse files Browse the repository at this point in the history
…euml#37, closes neuml#38, closes neuml#39
  • Loading branch information
davidmezzetti committed Dec 5, 2021
1 parent 1303f64 commit 055dc1a
Show file tree
Hide file tree
Showing 19 changed files with 607 additions and 294 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ jobs:
pip install -U pip wheel coverage coveralls
pip install .
python -c "import nltk; nltk.download('punkt')"
pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.2.5/en_core_sci_md-0.2.5.tar.gz
python --version
make data coverage
env:
Expand Down
7 changes: 3 additions & 4 deletions src/python/paperetl/cord19/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from .execute import Execute

if __name__ == "__main__":
if len(sys.argv) > 1:
if len(sys.argv) > 2:
Execute.run(
sys.argv[1],
sys.argv[2] if len(sys.argv) > 2 else None,
sys.argv[2],
sys.argv[3] if len(sys.argv) > 3 else None,
sys.argv[4] == "True" if len(sys.argv) > 4 else True,
sys.argv[5] if len(sys.argv) > 5 else None,
sys.argv[4] == "True" if len(sys.argv) > 4 else False,
)
38 changes: 12 additions & 26 deletions src/python/paperetl/cord19/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,13 @@ def getTags(sections):
return tags

@staticmethod
def stream(indir, dates, merge):
def stream(indir, dates):
"""
Generator that yields rows from a metadata.csv file. The directory is also included.
Args:
indir: input directory
dates: list of uid - entry dates for current metadata file
merge: only merges/processes this list of uids, if enabled
"""

# Filter out duplicate ids
Expand All @@ -152,15 +151,9 @@ def stream(indir, dates, merge):
sha = Execute.getHash(row)

# Only process if all conditions below met:
# - Merge set to None (must check for None as merge can be an empty set) or uid in list of ids to merge
# - cord uid in entry date mapping
# - cord uid and sha hash not already processed
if (
(merge is None or uid in merge)
and uid in dates
and uid not in ids
and sha not in hashes
):
if uid in dates and uid not in ids and sha not in hashes:
yield (row, indir)

# Add uid and sha as processed
Expand Down Expand Up @@ -210,7 +203,7 @@ def process(params):
Execute.getUrl(row),
)

return Article(metadata, sections, None)
return Article(metadata, sections)

@staticmethod
def entryDates(indir, entryfile):
Expand Down Expand Up @@ -251,56 +244,49 @@ def entryDates(indir, entryfile):

# Store date if cord uid maps to value in entries
if row["cord_uid"] == uid:
dates[uid] = date
dates[uid] = parser.parse(date)

return dates

@staticmethod
def run(indir, url, entryfile, full, merge):
def run(indir, url, entryfile=None, replace=False):
"""
Main execution method.
Args:
indir: input directory
url: database url
entryfile: path to entry dates file
full: full database load if True, only loads tagged articles if False
merge: database url to use for merging prior results
replace: if true, a new database will be created, overwriting any existing database
"""

print(f"Building articles database from {indir}")

# Set database url
if not url:
url = os.path.join(os.path.expanduser("~"), ".cord19", "models")

# Create database
db = Factory.create(url)
db = Factory.create(url, replace)

# Load entry dates
dates = Execute.entryDates(indir, entryfile)

# Merge existing db, if present
if merge:
merge = db.merge(merge, dates)
print("Merged results from existing articles database")

# Create process pool
with Pool(os.cpu_count()) as pool:
for article in pool.imap(
Execute.process, Execute.stream(indir, dates, merge), 100
Execute.process, Execute.stream(indir, dates), 100
):
# Get unique id
uid = article.uid()

# Only load untagged rows if this is a full database load
if full or article.tags():
if article.tags():
# Append entry date
article.metadata = article.metadata + (dates[uid],)

# Save article
db.save(article)

pool.close()
pool.join()

# Complete processing
db.complete()

Expand Down
16 changes: 0 additions & 16 deletions src/python/paperetl/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,6 @@ class Database:
Defines data structures and methods to store article content.
"""

# pylint: disable=W0613
def merge(self, url, ids):
"""
Merges the results of an existing database into the current database. This method returns
a list of ids not merged, which means there is a newer version available in the source data.
Args:
url: database connection
ids: dict of id - entry date
Returns:
list of eligible ids NOT merged
"""

return []

def save(self, article):
"""
Saves an article.
Expand Down
15 changes: 12 additions & 3 deletions src/python/paperetl/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ class Elastic(Database):
"mappings": {"properties": {"sections": {"type": "nested"}}},
}

def __init__(self, url):
def __init__(self, url, replace):
"""
Connects and initializes an elasticsearch instance.
Args:
url: elasticsearch url
replace: If database should be recreated
"""

# Connect to ES instance
Expand All @@ -39,8 +40,16 @@ def __init__(self, url):
# Buffered actions
self.buffer = []

# Create index if it doesn't exist
if not self.connection.indices.exists("articles"):
# Check if index exists
exists = self.connection.indices.exists("articles")

# Delete if replace enabled
if exists and replace:
self.connection.indices.delete("articles")
exists = False

# Create if necessary
if not exists:
self.connection.indices.create("articles", Elastic.ARTICLES)

def save(self, article):
Expand Down
7 changes: 4 additions & 3 deletions src/python/paperetl/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ class Factory:
"""

@staticmethod
def create(url):
def create(url, replace):
"""
Creates a new database connection.
Args:
url: connection url
replace: if true, a new database will be created, overwriting any existing database
Returns:
Database
"""

if url.startswith("http://"):
return Elastic(url)
return Elastic(url, replace)
if url.startswith("json://"):
return JSON(url.replace("json://", ""))
if url.startswith("yaml://"):
return YAML(url.replace("yaml://", ""))
if url:
# If URL is present, assume it's SQLite
return SQLite(url.replace("sqlite://", ""))
return SQLite(url.replace("sqlite://", ""), replace)

return None
5 changes: 4 additions & 1 deletion src/python/paperetl/file/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
if __name__ == "__main__":
if len(sys.argv) > 2:
Execute.run(
sys.argv[1], sys.argv[2], sys.argv[3] if len(sys.argv) > 3 else None
sys.argv[1],
sys.argv[2],
sys.argv[3] if len(sys.argv) > 3 else None,
sys.argv[4] == "True" if len(sys.argv) > 4 else False,
)
166 changes: 166 additions & 0 deletions src/python/paperetl/file/arx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
arXiv XML processing module
"""

import hashlib
import re

from bs4 import BeautifulSoup
from dateutil import parser
from nltk.tokenize import sent_tokenize

from ..schema.article import Article
from ..text import Text


class ARX:
"""
Methods to transform arXiv XML into article objects.
"""

@staticmethod
def parse(stream, source):
"""
Parses a XML datastream and yields processed articles.
Args:
stream: handle to input data stream
source: text string describing stream source, can be None
config: path to config directory
"""

# Parse XML
soup = BeautifulSoup(stream, "lxml")

# Process each entry
for entry in soup.find_all("entry"):
reference = ARX.get(entry, "id")
title = ARX.get(entry, "title")
published = parser.parse(ARX.get(entry, "published").split("T")[0])
updated = parser.parse(ARX.get(entry, "updated").split("T")[0])

# Derive uid
uid = hashlib.sha1(reference.encode("utf-8")).hexdigest()

# Get journal reference
journal = ARX.get(entry, "arxiv:journal_ref")

# Get authors
authors, affiliations, affiliation = ARX.authors(entry.find_all("author"))

# Get tags
tags = "; ".join(
["ARX"]
+ [category.get("term") for category in entry.find_all("category")]
)

# Transform section text
sections = ARX.sections(title, ARX.get(entry, "summary"))

# Article metadata - id, source, published, publication, authors, affiliations, affiliation, title,
# tags, reference, entry date
metadata = (
uid,
source,
published,
journal,
authors,
affiliations,
affiliation,
title,
tags,
reference,
updated,
)

yield Article(metadata, sections)

@staticmethod
def get(element, path):
"""
Finds the first matching path in element and returns the element text.
Args:
element: XML element
path: path expression
Returns:
string
"""

element = element.find(path)
return ARX.clean(element.text) if element else None

@staticmethod
def clean(text):
"""
Removes newlines and extra spacing from text.
Args:
text: text to clean
Returns:
clean text
"""

# Remove newlines and cleanup spacing
text = text.replace("\n", " ")
return re.sub(r"\s+", " ", text).strip()

@staticmethod
def authors(elements):
"""
Parses authors and associated affiliations from the article.
Args:
elements: authors elements
Returns:
(semicolon separated list of authors, semicolon separated list of affiliations, primary affiliation)
"""

authors = []
affiliations = []

for author in elements:
# Create authors as lastname, firstname
name = ARX.get(author, "name")
authors.append(", ".join(name.rsplit(maxsplit=1)[::-1]))

# Add affiliations
affiliations.extend(
[
ARX.clean(affiliation.text)
for affiliation in author.find_all("arxiv:affiliation")
]
)

return (
"; ".join(authors),
"; ".join(dict.fromkeys(affiliations)),
affiliations[-1] if affiliations else None,
)

@staticmethod
def sections(title, text):
"""
Gets a list of sections for this article.
Args:
title: title string
text: summary text
Returns:
list of sections
"""

# Add title
sections = [("TITLE", title)]

# Transform and clean text
text = Text.transform(text)

# Split text into sentences, transform text and add to sections
sections.extend([("ABSTRACT", x) for x in sent_tokenize(text)])

return sections
Loading

0 comments on commit 055dc1a

Please sign in to comment.