Skip to content

Commit

Permalink
Add function to get related media items
Browse files Browse the repository at this point in the history
  • Loading branch information
dseomn committed Oct 28, 2023
1 parent 215a6f8 commit 47353c6
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 1 deletion.
113 changes: 113 additions & 0 deletions rock_paper_sand/wikidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.
"""Code that uses Wikidata's APIs."""

import collections
from collections.abc import Generator, Iterable, Sequence, Set
import contextlib
import dataclasses
import datetime
import re
import typing
Expand Down Expand Up @@ -188,6 +190,41 @@ def _parse_sparql_result_string(term: Any) -> str:
return term["value"]


@dataclasses.dataclass(frozen=True, kw_only=True)
class RelatedMedia:
"""Media or media groups related to a media item.
Attributes:
parents: Parents of the item, e.g., a book series that the item is
included in.
siblings: Siblings of the item, e.g., sequels and prequels.
children: Children of the item, e.g., a book that the book series item
includes.
loose: More loosely related items, e.g., a work that the item was based
on but is not necessarily a sequel to.
"""

parents: Set[wikidata_value.Item]
siblings: Set[wikidata_value.Item]
children: Set[wikidata_value.Item]
loose: Set[wikidata_value.Item]


_PARENT_PROPERTIES = (
wikidata_value.P_PART_OF,
wikidata_value.P_PART_OF_THE_SERIES,
)
_SIBLING_PROPERTIES = (
wikidata_value.P_FOLLOWED_BY,
wikidata_value.P_FOLLOWS,
)
_CHILD_PROPERTIES = (wikidata_value.P_HAS_PARTS,)
_LOOSE_PROPERTIES = (
wikidata_value.P_BASED_ON,
wikidata_value.P_DERIVATIVE_WORK,
)


class Api:
"""Wrapper around Wikidata APIs."""

Expand All @@ -204,6 +241,7 @@ def __init__(
self._transitive_subclasses: (
dict[wikidata_value.Item, Set[wikidata_value.Item]]
) = {}
self._related_media: dict[wikidata_value.Item, RelatedMedia] = {}

def item(self, item_id: wikidata_value.Item) -> Any:
"""Returns an item in full JSON format."""
Expand Down Expand Up @@ -254,6 +292,81 @@ def transitive_subclasses(
)
return self._transitive_subclasses[class_id]

def related_media(self, item_id: wikidata_value.Item) -> RelatedMedia:
"""Returns related media."""
if item_id not in self._related_media:
predicate_by_relation = {
"parent": "|".join(
(
*(f"wdt:{p.id}" for p in _PARENT_PROPERTIES),
*(f"^wdt:{p.id}" for p in _CHILD_PROPERTIES),
)
),
"sibling": "|".join(
f"wdt:{p.id}|^wdt:{p.id}" for p in _SIBLING_PROPERTIES
),
"child": "|".join(
(
*(f"wdt:{p.id}" for p in _CHILD_PROPERTIES),
*(f"^wdt:{p.id}" for p in _PARENT_PROPERTIES),
)
),
"loose": "|".join(
f"wdt:{p.id}|^wdt:{p.id}" for p in _LOOSE_PROPERTIES
),
}
instance_of = wikidata_value.P_INSTANCE_OF.id
query = " ".join(
(
"SELECT REDUCED ?item ?relation ?class WHERE {",
" UNION ".join(
(
"{ "
f"wd:{item_id.id} ({predicate}) ?item. "
f'BIND ("{relation}" AS ?relation) '
"}"
)
for relation, predicate in predicate_by_relation.items()
),
f"OPTIONAL {{ ?item wdt:{instance_of} ?class. }}",
"}",
)
)
results = self.sparql(query)
item_classes: (
collections.defaultdict[
wikidata_value.Item, set[wikidata_value.Item]
]
) = collections.defaultdict(set)
items_by_relation: (
collections.defaultdict[str, set[wikidata_value.Item]]
) = collections.defaultdict(set)
for result in results:
related_item = _parse_sparql_result_item(result["item"])
related_item_classes = item_classes[related_item]
if "class" in result:
related_item_classes.add(
_parse_sparql_result_item(result["class"])
)
items_by_relation[
_parse_sparql_result_string(result["relation"])
].add(related_item)
for related_item, classes in item_classes.items():
self._item_classes.setdefault(related_item, frozenset(classes))
related_media = RelatedMedia(
parents=frozenset(items_by_relation.pop("parent", ())),
siblings=frozenset(items_by_relation.pop("sibling", ())),
children=frozenset(items_by_relation.pop("child", ())),
loose=frozenset(items_by_relation.pop("loose", ())),
)
if items_by_relation:
raise ValueError(
"Unexpected media relation types: "
f"{list(items_by_relation)}"
)
self._related_media[item_id] = related_media
return self._related_media[item_id]


def _release_status(
item: Any,
Expand Down
120 changes: 119 additions & 1 deletion rock_paper_sand/wikidata_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pylint: disable=missing-module-docstring

from collections.abc import Mapping, Sequence
from collections.abc import Collection, Mapping, Sequence
import datetime
from typing import Any
from unittest import mock
Expand Down Expand Up @@ -197,6 +197,124 @@ def test_transitive_subclasses(self) -> None:
# Note that this only happens once because the second time is cached.
self._mock_session.get.assert_called_once()

@parameterized.named_parameters(
dict(
testcase_name="no_results",
sparql_results=[],
expected_result=dict(
parents=(),
siblings=(),
children=(),
loose=(),
),
expected_cached_classes={},
),
dict(
testcase_name="with_results",
sparql_results=[
{
"item": _sparql_item("Q2"),
"relation": _sparql_string("parent"),
},
{
"item": _sparql_item("Q3"),
"relation": _sparql_string("sibling"),
"class": _sparql_item("Q31"),
},
{
"item": _sparql_item("Q3"),
"relation": _sparql_string("sibling"),
"class": _sparql_item("Q31"),
},
{
"item": _sparql_item("Q3"),
"relation": _sparql_string("sibling"),
"class": _sparql_item("Q32"),
},
{
"item": _sparql_item("Q4"),
"relation": _sparql_string("child"),
},
{
"item": _sparql_item("Q5"),
"relation": _sparql_string("child"),
},
{
"item": _sparql_item("Q4"),
"relation": _sparql_string("loose"),
},
],
expected_result=dict(
parents=("Q2",),
siblings=("Q3",),
children=("Q4", "Q5"),
loose=("Q4",),
),
expected_cached_classes={
"Q2": (),
"Q3": ("Q31", "Q32"),
"Q4": (),
"Q5": (),
},
),
)
def test_related_media(
self,
*,
sparql_results: list[Any],
expected_result: Mapping[str, Collection[str]],
expected_cached_classes: Mapping[str, Collection[str]],
) -> None:
self._mock_session.get.return_value.json.return_value = {
"results": {"bindings": sparql_results}
}

first_result = self._api.related_media(wikidata_value.Item("Q1"))
second_result = self._api.related_media(wikidata_value.Item("Q1"))
actual_classes = {
item: self._api.item_classes(item)
for item in {
*first_result.parents,
*first_result.siblings,
*first_result.children,
*first_result.loose,
}
}

expected_related_media = wikidata.RelatedMedia(
**{
key: frozenset(map(wikidata_value.Item, values))
for key, values in expected_result.items()
}
)
expected_classes = {
wikidata_value.Item(item_id): frozenset(
map(wikidata_value.Item, classes)
)
for item_id, classes in expected_cached_classes.items()
}
self.assertEqual(expected_related_media, first_result)
self.assertEqual(expected_related_media, second_result)
self.assertEqual(expected_classes, actual_classes)
# Note that this only happens once because the second related_media()
# call and all the item_classes() calls are cached.
self._mock_session.get.assert_called_once()

def test_related_media_error(self) -> None:
self._mock_session.get.return_value.json.return_value = {
"results": {
"bindings": [
{
"item": _sparql_item("Q2"),
"relation": _sparql_string("kumquat"),
}
]
}
}

with self.assertRaisesRegex(ValueError, "kumquat"):
self._api.related_media(wikidata_value.Item("Q1"))


class WikidataUtilsTest(parameterized.TestCase):
# pylint: disable=protected-access
Expand Down
7 changes: 7 additions & 0 deletions rock_paper_sand/wikidata_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,16 @@ def from_string(cls, value: str) -> Self:


_p = Property.from_string
P_BASED_ON = _p("https://www.wikidata.org/wiki/Property:P144")
P_DATE_OF_FIRST_PERFORMANCE = _p("https://www.wikidata.org/wiki/Property:P1191")
P_DERIVATIVE_WORK = _p("https://www.wikidata.org/wiki/Property:P4969")
P_END_TIME = _p("https://www.wikidata.org/wiki/Property:P582")
P_FOLLOWED_BY = _p("https://www.wikidata.org/wiki/Property:P156")
P_FOLLOWS = _p("https://www.wikidata.org/wiki/Property:P155")
P_HAS_PARTS = _p("https://www.wikidata.org/wiki/Property:P527")
P_INSTANCE_OF = _p("https://www.wikidata.org/wiki/Property:P31")
P_PART_OF = _p("https://www.wikidata.org/wiki/Property:P361")
P_PART_OF_THE_SERIES = _p("https://www.wikidata.org/wiki/Property:P179")
P_PUBLICATION_DATE = _p("https://www.wikidata.org/wiki/Property:P577")
P_START_TIME = _p("https://www.wikidata.org/wiki/Property:P580")
P_SUBCLASS_OF = _p("https://www.wikidata.org/wiki/Property:P279")
Expand Down

0 comments on commit 47353c6

Please sign in to comment.