Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions hivemind_etl/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,28 @@


@activity.defn
async def get_communities() -> list[dict[str, Any]]:
"""Fetch all communities that need to be processed."""
async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]:
"""
Fetch all communities that need to be processed in case of no platform id given
Else, just process for one platform

Parameters
-----------
platform_id : str | None
A platform's community to be fetched
for default it is as `None` meaning to get all communities information

Returns
---------
communities : list[dict[str, Any]]
a list of communities holding website informations
"""
try:
communities = ModulesWebsite().get_learning_platforms()
if platform_id:
logger.info("Website ingestion is filtered for a single community!")
communities = ModulesWebsite().get_learning_platforms(
filter_platform_id=platform_id
)
logger.info(f"Found {len(communities)} communities to process")
return communities
except Exception as e:
Expand Down
11 changes: 11 additions & 0 deletions hivemind_etl/website/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ def __init__(self) -> None:

def get_learning_platforms(
self,
filter_platform_id: str | None = None,
) -> list[dict[str, str | list[str]]]:
"""
Get all the website communities with their page titles.

Parameters
-----------
filter_platform_id : str | None
A platform's community to be fetched
for default it is as `None` meaning to get all communities information

Returns
---------
community_orgs : list[dict[str, str | list[str]]] = []
Expand Down Expand Up @@ -41,6 +48,10 @@ def get_learning_platforms(

platform_id = platform["platform"]

# if we needed to get specific platforms
if filter_platform_id and filter_platform_id != str(platform_id):
continue

try:
website_links = self.get_platform_metadata(
platform_id=platform_id,
Expand Down
3 changes: 2 additions & 1 deletion test_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ async def start_workflow():
# id="schedules-say-hello",
id="schedules-website-ingestion",
task_queue=task_queue,
args=["platform_id"],
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))]
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
),
state=ScheduleState(note="Here's a note on my Schedule."),
),
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/test_module_website.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,77 @@ def test_get_website_communities_data_module_multiple_platforms(self):
"urls": ["link1", "link2"],
},
)

def test_get_website_communities_data_module_multiple_platforms_filtered_one(self):
"""
Test get_learning_platforms when a community has multiple platforms but with a platform filter applied
Verifies that only website platform data is returned even when
other platform types exist.
"""
platform_id = ObjectId("6579c364f1120850414e0dc6")
platform_id2 = ObjectId("6579c364f1120850414e0dc7")
community_id = ObjectId("6579c364f1120850414e0dc5")

self.client["Core"]["platforms"].insert_one(
{
"_id": platform_id,
"name": "website",
"metadata": {"resources": ["link1", "link2"]},
"community": community_id,
"disconnectedAt": None,
"connectedAt": datetime.now(),
"createdAt": datetime.now(),
"updatedAt": datetime.now(),
}
)

self.client["Core"]["platforms"].insert_one(
{
"_id": platform_id2,
"name": "website",
"metadata": {"resources": ["link3", "link4"]},
"community": community_id,
"disconnectedAt": None,
"connectedAt": datetime.now(),
"createdAt": datetime.now(),
"updatedAt": datetime.now(),
}
)

self.client["Core"]["modules"].insert_one(
{
"name": "hivemind",
"community": community_id,
"options": {
"platforms": [
{
"platform": platform_id,
"name": "website",
"metadata": {},
},
{
"platform": platform_id2,
"name": "website",
"metadata": {},
},
]
},
}
)

result = self.modules_website.get_learning_platforms(
filter_platform_id=str(platform_id)
)

# Assertions
self.assertIsInstance(result, list)
self.assertEqual(len(result), 1)

self.assertEqual(
result[0],
{
"community_id": "6579c364f1120850414e0dc5",
"platform_id": str(platform_id),
"urls": ["link1", "link2"],
},
)
7 changes: 4 additions & 3 deletions workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ async def run(self, community_info: dict) -> None:
@workflow.defn
class WebsiteIngestionSchedulerWorkflow:
@workflow.run
async def run(self) -> None:
async def run(self, platform_id: str | None = None) -> None:
# Get all communities
communities = await workflow.execute_activity(
get_communities,
start_to_close_timeout=timedelta(minutes=5),
platform_id,
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(
maximum_attempts=1,
maximum_attempts=3,
),
)

Expand Down