Skip to content

Commit d9f1fd6

Browse files
authored
Merge pull request #15 from TogetherCrew/feat/14-mediawiki-etl
feat: added media wiki ETL!
2 parents bb5c53a + 56d997a commit d9f1fd6

File tree

18 files changed

+1328
-171
lines changed

18 files changed

+1328
-171
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,8 @@ cython_debug/
161161
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
162162
#.idea/
163163

164-
main.ipynb
164+
main.ipynb
165+
166+
*.xml
167+
168+
dump_*

hivemind_etl/activities.py

Lines changed: 14 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,26 @@
11
import logging
22
from typing import Any
33

4-
from temporalio import activity, workflow
4+
from hivemind_etl.website.activities import (
5+
get_hivemind_website_comminities,
6+
extract_website,
7+
transform_website_data,
8+
load_website_data,
9+
)
10+
from hivemind_etl.mediawiki.activities import (
11+
get_hivemind_mediawiki_platforms,
12+
extract_mediawiki,
13+
transform_mediawiki_data,
14+
load_mediawiki_data,
15+
)
16+
17+
from temporalio import activity
518

6-
with workflow.unsafe.imports_passed_through():
7-
from hivemind_etl.website.module import ModulesWebsite
8-
from hivemind_etl.website.website_etl import WebsiteETL
9-
from llama_index.core import Document
1019

1120
logging.basicConfig(level=logging.INFO)
1221
logger = logging.getLogger(__name__)
1322

1423

15-
@activity.defn
16-
async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]:
17-
"""
18-
Fetch all communities that need to be processed in case of no platform id given
19-
Else, just process for one platform
20-
21-
Parameters
22-
-----------
23-
platform_id : str | None
24-
A platform's community to be fetched
25-
for default it is as `None` meaning to get all communities information
26-
27-
Returns
28-
---------
29-
communities : list[dict[str, Any]]
30-
a list of communities holding website informations
31-
"""
32-
try:
33-
if platform_id:
34-
logger.info("Website ingestion is filtered for a single community!")
35-
communities = ModulesWebsite().get_learning_platforms(
36-
filter_platform_id=platform_id
37-
)
38-
logger.info(f"Found {len(communities)} communities to process")
39-
logging.info(f"communities: {communities}")
40-
return communities
41-
except Exception as e:
42-
logger.error(f"Error fetching communities: {str(e)}")
43-
raise
44-
45-
46-
@activity.defn
47-
async def extract_website(urls: list[str], community_id: str) -> list[dict]:
48-
"""Extract data from website URLs."""
49-
try:
50-
logger.info(
51-
f"Starting extraction for community {community_id} with {len(urls)} URLs"
52-
)
53-
website_etl = WebsiteETL(community_id=community_id)
54-
result = await website_etl.extract(urls=urls)
55-
logger.info(f"Completed extraction for community {community_id}")
56-
return result
57-
except Exception as e:
58-
logger.error(f"Error in extraction for community {community_id}: {str(e)}")
59-
raise
60-
61-
62-
@activity.defn
63-
async def transform_data(raw_data: list[dict], community_id: str) -> list[Document]:
64-
"""Transform the extracted raw data."""
65-
try:
66-
logger.info(f"Starting transformation for community {community_id}")
67-
website_etl = WebsiteETL(community_id=community_id)
68-
result = website_etl.transform(raw_data=raw_data)
69-
logger.info(f"Completed transformation for community {community_id}")
70-
return result
71-
except Exception as e:
72-
logger.error(f"Error in transformation for community {community_id}: {str(e)}")
73-
raise
74-
75-
76-
@activity.defn
77-
async def load_data(documents: list[Document], community_id: str) -> None:
78-
"""Load the transformed data into the database."""
79-
try:
80-
logger.info(f"Starting data load for community {community_id}")
81-
website_etl = WebsiteETL(community_id=community_id)
82-
website_etl.load(documents=documents)
83-
logger.info(f"Completed data load for community {community_id}")
84-
except Exception as e:
85-
logger.error(f"Error in data load for community {community_id}: {str(e)}")
86-
raise
87-
88-
8924
@activity.defn
9025
async def say_hello():
9126
return 7

hivemind_etl/mediawiki/__init__.py

Whitespace-only changes.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import logging
2+
from typing import Any
3+
4+
from temporalio import activity, workflow
5+
6+
with workflow.unsafe.imports_passed_through():
7+
from hivemind_etl.mediawiki.module import ModulesMediaWiki
8+
from hivemind_etl.mediawiki.etl import MediawikiETL
9+
from llama_index.core import Document
10+
11+
12+
@activity.defn
13+
async def get_hivemind_mediawiki_platforms(
14+
platform_id: str | None = None,
15+
) -> list[dict[str, Any]]:
16+
"""
17+
Fetch all MediaWiki communities that need to be processed in case of no platform id given
18+
Else, just process for one platform
19+
20+
Parameters
21+
-----------
22+
platform_id : str | None
23+
A platform's community to be fetched
24+
for default it is as `None` meaning to get all platforms information
25+
26+
example data output:
27+
```
28+
[{
29+
"community_id": "6579c364f1120850414e0dc5",
30+
"base_url": "some_api_url",
31+
"namespaces": [1, 2, 3],
32+
}]
33+
```
34+
35+
Returns
36+
---------
37+
platforms : list[dict[str, Any]]
38+
a list of platforms holding MediaWiki informations
39+
"""
40+
try:
41+
if platform_id:
42+
logging.info("MediaWiki ingestion is filtered for a single platform!")
43+
platforms = ModulesMediaWiki().get_learning_platforms(
44+
platform_id_filter=platform_id
45+
)
46+
logging.info(f"Found {len(platforms)} platforms to process")
47+
logging.info(f"platforms: {platforms}")
48+
return platforms
49+
except Exception as e:
50+
logging.error(f"Error fetching MediaWiki platforms: {str(e)}")
51+
raise
52+
53+
54+
@activity.defn
55+
async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
56+
"""Extract data from MediaWiki API URL."""
57+
try:
58+
community_id = mediawiki_platform["community_id"]
59+
api_url = mediawiki_platform["base_url"]
60+
namespaces = mediawiki_platform["namespaces"]
61+
62+
logging.info(
63+
f"Starting extraction for community {community_id} with API URL: {api_url}"
64+
)
65+
mediawiki_etl = MediawikiETL(community_id=community_id, namespaces=namespaces)
66+
mediawiki_etl.extract(api_url=api_url)
67+
logging.info(f"Completed extraction for community {community_id}")
68+
except Exception as e:
69+
community_id = mediawiki_platform["community_id"]
70+
logging.error(f"Error in extraction for community {community_id}: {str(e)}")
71+
raise
72+
73+
74+
@activity.defn
75+
async def transform_mediawiki_data(community_id: str) -> list[Document]:
76+
"""Transform the extracted MediaWiki data."""
77+
try:
78+
logging.info(f"Starting transformation for community {community_id}")
79+
mediawiki_etl = MediawikiETL(community_id=community_id)
80+
result = mediawiki_etl.transform()
81+
logging.info(f"Completed transformation for community {community_id}")
82+
return result
83+
except Exception as e:
84+
logging.error(f"Error in transformation for community {community_id}: {str(e)}")
85+
raise
86+
87+
88+
@activity.defn
89+
async def load_mediawiki_data(documents: list[Document], community_id: str) -> None:
90+
"""Load the transformed MediaWiki data into the database."""
91+
try:
92+
logging.info(f"Starting data load for community {community_id}")
93+
mediawiki_etl = MediawikiETL(community_id=community_id)
94+
mediawiki_etl.load(documents=documents)
95+
logging.info(f"Completed data load for community {community_id}")
96+
except Exception as e:
97+
logging.error(f"Error in data load for community {community_id}: {str(e)}")
98+
raise

hivemind_etl/mediawiki/etl.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import logging
2+
import shutil
3+
4+
from llama_index.core import Document
5+
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
6+
from hivemind_etl.mediawiki.transform_xml import parse_mediawiki_xml
7+
from hivemind_etl.mediawiki.wikiteam_crawler import WikiteamCrawler
8+
9+
10+
class MediawikiETL:
11+
def __init__(
12+
self,
13+
community_id: str,
14+
namespaces: list[int],
15+
delete_dump_after_load: bool = True,
16+
) -> None:
17+
self.community_id = community_id
18+
self.wikiteam_crawler = WikiteamCrawler(community_id, namespaces=namespaces)
19+
20+
self.dump_dir = f"dump_{self.community_id}"
21+
self.delete_dump_after_load = delete_dump_after_load
22+
23+
def extract(self, api_url: str, dump_dir: str | None = None) -> None:
24+
if dump_dir is None:
25+
dump_dir = self.dump_dir
26+
else:
27+
self.dump_dir = dump_dir
28+
29+
self.wikiteam_crawler.crawl(api_url, dump_dir)
30+
31+
def transform(self) -> list[Document]:
32+
pages = parse_mediawiki_xml(file_dir=self.dump_dir)
33+
34+
documents: list[Document] = []
35+
for page in pages:
36+
try:
37+
documents.append(
38+
Document(
39+
doc_id=page.page_id,
40+
text=page.revision.text,
41+
metadata={
42+
"title": page.title,
43+
"namespace": page.namespace,
44+
"revision_id": page.revision.revision_id,
45+
"parent_revision_id": page.revision.parent_revision_id,
46+
"timestamp": page.revision.timestamp,
47+
"comment": page.revision.comment,
48+
"contributor_username": page.revision.contributor.username,
49+
"contributor_user_id": page.revision.contributor.user_id,
50+
"sha1": page.revision.sha1,
51+
"model": page.revision.model,
52+
},
53+
excluded_embed_metadata_keys=[
54+
"namespace",
55+
"revision_id",
56+
"parent_revision_id",
57+
"sha1",
58+
"model",
59+
"contributor_user_id",
60+
"comment",
61+
"timestamp",
62+
],
63+
excluded_llm_metadata_keys=[
64+
"namespace",
65+
"revision_id",
66+
"parent_revision_id",
67+
"sha1",
68+
"model",
69+
"contributor_user_id",
70+
],
71+
)
72+
)
73+
except Exception as e:
74+
logging.error(f"Error transforming page {page.page_id}: {e}")
75+
76+
return documents
77+
78+
def load(self, documents: list[Document]) -> None:
79+
ingestion_pipeline = CustomIngestionPipeline(
80+
self.community_id, collection_name="mediawiki"
81+
)
82+
ingestion_pipeline.run_pipeline(documents)
83+
84+
if self.delete_dump_after_load:
85+
shutil.rmtree(self.dump_dir)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""XML Reader."""
2+
3+
"""Copied from https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/xml/base.py"""
4+
5+
import re
6+
import xml.etree.ElementTree as ET
7+
from pathlib import Path
8+
from typing import Dict, List, Optional
9+
10+
from llama_index.core.readers.base import BaseReader
11+
from llama_index.core.schema import Document
12+
13+
14+
def _get_leaf_nodes_up_to_level(root: ET.Element, level: int) -> List[ET.Element]:
15+
"""Get collection of nodes up to certain level including leaf nodes.
16+
17+
Args:
18+
root (ET.Element): XML Root Element
19+
level (int): Levels to traverse in the tree
20+
21+
Returns:
22+
List[ET.Element]: List of target nodes
23+
"""
24+
25+
def traverse(current_node, current_level):
26+
if len(current_node) == 0 or level == current_level:
27+
# Keep leaf nodes and target level nodes
28+
nodes.append(current_node)
29+
elif current_level < level:
30+
# Move to the next level
31+
for child in current_node:
32+
traverse(child, current_level + 1)
33+
34+
nodes = []
35+
traverse(root, 0)
36+
return nodes
37+
38+
39+
class XMLReader(BaseReader):
40+
"""XML reader.
41+
42+
Reads XML documents with options to help suss out relationships between nodes.
43+
44+
Args:
45+
tree_level_split (int): From which level in the xml tree we split documents,
46+
the default level is the root which is level 0
47+
48+
"""
49+
50+
def __init__(self, tree_level_split: Optional[int] = 0) -> None:
51+
"""Initialize with arguments."""
52+
super().__init__()
53+
self.tree_level_split = tree_level_split
54+
55+
def _parse_xmlelt_to_document(
56+
self, root: ET.Element, extra_info: Optional[Dict] = None
57+
) -> List[Document]:
58+
"""Parse the xml object into a list of Documents.
59+
60+
Args:
61+
root: The XML Element to be converted.
62+
extra_info (Optional[Dict]): Additional information. Default is None.
63+
64+
Returns:
65+
Document: The documents.
66+
"""
67+
nodes = _get_leaf_nodes_up_to_level(root, self.tree_level_split)
68+
documents = []
69+
for node in nodes:
70+
content = ET.tostring(node, encoding="utf8").decode("utf-8")
71+
content = re.sub(r"^<\?xml.*", "", content)
72+
content = content.strip()
73+
documents.append(Document(text=content, extra_info=extra_info or {}))
74+
75+
return documents
76+
77+
def load_data(
78+
self,
79+
file: Path,
80+
extra_info: Optional[Dict] = None,
81+
) -> List[Document]:
82+
"""Load data from the input file.
83+
84+
Args:
85+
file (Path): Path to the input file.
86+
extra_info (Optional[Dict]): Additional information. Default is None.
87+
88+
Returns:
89+
List[Document]: List of documents.
90+
"""
91+
if not isinstance(file, Path):
92+
file = Path(file)
93+
94+
tree = ET.parse(file)
95+
return self._parse_xmlelt_to_document(tree.getroot(), extra_info)

0 commit comments

Comments
 (0)