Skip to content

Commit 57c3033

Browse files
authored
Merge pull request #3 from TogetherCrew/feat/2-single-platform-website-ingestion
feat: enhance community fetching with platform filtering and update w…
2 parents 7855126 + d1d58c8 commit 57c3033

File tree

5 files changed

+112
-7
lines changed

5 files changed

+112
-7
lines changed

hivemind_etl/activities.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,28 @@
1313

1414

1515
@activity.defn
16-
async def get_communities() -> list[dict[str, Any]]:
17-
"""Fetch all communities that need to be processed."""
16+
async def get_communities(platform_id: str | None = None) -> list[dict[str, Any]]:
17+
"""
18+
Fetch all communities that need to be processed in case of no platform id given
19+
Else, just process for one platform
20+
21+
Parameters
22+
-----------
23+
platform_id : str | None
24+
A platform's community to be fetched
25+
for default it is as `None` meaning to get all communities information
26+
27+
Returns
28+
---------
29+
communities : list[dict[str, Any]]
30+
a list of communities holding website informations
31+
"""
1832
try:
19-
communities = ModulesWebsite().get_learning_platforms()
33+
if platform_id:
34+
logger.info("Website ingestion is filtered for a single community!")
35+
communities = ModulesWebsite().get_learning_platforms(
36+
filter_platform_id=platform_id
37+
)
2038
logger.info(f"Found {len(communities)} communities to process")
2139
return communities
2240
except Exception as e:

hivemind_etl/website/module.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,17 @@ def __init__(self) -> None:
1010

1111
def get_learning_platforms(
1212
self,
13+
filter_platform_id: str | None = None,
1314
) -> list[dict[str, str | list[str]]]:
1415
"""
1516
Get all the website communities with their page titles.
1617
18+
Parameters
19+
-----------
20+
filter_platform_id : str | None
21+
A platform's community to be fetched
22+
for default it is as `None` meaning to get all communities information
23+
1724
Returns
1825
---------
1926
community_orgs : list[dict[str, str | list[str]]] = []
@@ -41,6 +48,10 @@ def get_learning_platforms(
4148

4249
platform_id = platform["platform"]
4350

51+
# if we needed to get specific platforms
52+
if filter_platform_id and filter_platform_id != str(platform_id):
53+
continue
54+
4455
try:
4556
website_links = self.get_platform_metadata(
4657
platform_id=platform_id,

test_run_workflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ async def start_workflow():
4545
# id="schedules-say-hello",
4646
id="schedules-website-ingestion",
4747
task_queue=task_queue,
48+
args=["platform_id"],
4849
),
4950
spec=ScheduleSpec(
50-
intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))]
51+
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
5152
),
5253
state=ScheduleState(note="Here's a note on my Schedule."),
5354
),

tests/integration/test_module_website.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,77 @@ def test_get_website_communities_data_module_multiple_platforms(self):
128128
"urls": ["link1", "link2"],
129129
},
130130
)
131+
132+
def test_get_website_communities_data_module_multiple_platforms_filtered_one(self):
133+
"""
134+
Test get_learning_platforms when a community has multiple platforms but with a platform filter applied
135+
Verifies that only website platform data is returned even when
136+
other platform types exist.
137+
"""
138+
platform_id = ObjectId("6579c364f1120850414e0dc6")
139+
platform_id2 = ObjectId("6579c364f1120850414e0dc7")
140+
community_id = ObjectId("6579c364f1120850414e0dc5")
141+
142+
self.client["Core"]["platforms"].insert_one(
143+
{
144+
"_id": platform_id,
145+
"name": "website",
146+
"metadata": {"resources": ["link1", "link2"]},
147+
"community": community_id,
148+
"disconnectedAt": None,
149+
"connectedAt": datetime.now(),
150+
"createdAt": datetime.now(),
151+
"updatedAt": datetime.now(),
152+
}
153+
)
154+
155+
self.client["Core"]["platforms"].insert_one(
156+
{
157+
"_id": platform_id2,
158+
"name": "website",
159+
"metadata": {"resources": ["link3", "link4"]},
160+
"community": community_id,
161+
"disconnectedAt": None,
162+
"connectedAt": datetime.now(),
163+
"createdAt": datetime.now(),
164+
"updatedAt": datetime.now(),
165+
}
166+
)
167+
168+
self.client["Core"]["modules"].insert_one(
169+
{
170+
"name": "hivemind",
171+
"community": community_id,
172+
"options": {
173+
"platforms": [
174+
{
175+
"platform": platform_id,
176+
"name": "website",
177+
"metadata": {},
178+
},
179+
{
180+
"platform": platform_id2,
181+
"name": "website",
182+
"metadata": {},
183+
},
184+
]
185+
},
186+
}
187+
)
188+
189+
result = self.modules_website.get_learning_platforms(
190+
filter_platform_id=str(platform_id)
191+
)
192+
193+
# Assertions
194+
self.assertIsInstance(result, list)
195+
self.assertEqual(len(result), 1)
196+
197+
self.assertEqual(
198+
result[0],
199+
{
200+
"community_id": "6579c364f1120850414e0dc5",
201+
"platform_id": str(platform_id),
202+
"urls": ["link1", "link2"],
203+
},
204+
)

workflows.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,14 @@ async def run(self, community_info: dict) -> None:
6969
@workflow.defn
7070
class WebsiteIngestionSchedulerWorkflow:
7171
@workflow.run
72-
async def run(self) -> None:
72+
async def run(self, platform_id: str | None = None) -> None:
7373
# Get all communities
7474
communities = await workflow.execute_activity(
7575
get_communities,
76-
start_to_close_timeout=timedelta(minutes=5),
76+
platform_id,
77+
start_to_close_timeout=timedelta(minutes=20),
7778
retry_policy=RetryPolicy(
78-
maximum_attempts=1,
79+
maximum_attempts=3,
7980
),
8081
)
8182

0 commit comments

Comments
 (0)