Skip to content

Commit 71e60f0

Browse files
committed
feat: increase activity run time limit & update workflow id!
1 parent 099e257 commit 71e60f0

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

hivemind_etl/website/website_etl.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ def __init__(
2323
self.community_id = community_id
2424
collection_name = "website"
2525

26-
# preparing the data extractor and ingestion pipelines
27-
# self.crawlee_client = CrawleeClient()
26+
# preparing the ingestion pipeline
2827
self.ingestion_pipeline = CustomIngestionPipeline(
2928
self.community_id, collection_name=collection_name
3029
)
@@ -51,9 +50,9 @@ async def extract(
5150

5251
extracted_data = []
5352
for url in urls:
54-
crawlee_client = CrawleeClient()
53+
self.crawlee_client = CrawleeClient()
5554
logging.info(f"Crawling {url} and its routes!")
56-
data = await crawlee_client.crawl(links=[url])
55+
data = await self.crawlee_client.crawl(links=[url])
5756
logging.info(f"{len(data)} data is extracted for route: {url}")
5857
extracted_data.extend(data)
5958

workflows.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,18 @@ async def run(self, community_info: dict) -> None:
3434
raw_data = await workflow.execute_activity(
3535
extract_website,
3636
args=[urls, community_id],
37-
start_to_close_timeout=timedelta(minutes=10),
37+
start_to_close_timeout=timedelta(minutes=30),
3838
retry_policy=RetryPolicy(
3939
initial_interval=timedelta(seconds=10),
4040
maximum_interval=timedelta(minutes=5),
41-
maximum_attempts=1,
41+
maximum_attempts=3,
4242
),
4343
)
4444

4545
documents = await workflow.execute_activity(
4646
transform_data,
4747
args=[raw_data, community_id],
48-
start_to_close_timeout=timedelta(minutes=5),
48+
start_to_close_timeout=timedelta(minutes=10),
4949
retry_policy=RetryPolicy(
5050
initial_interval=timedelta(seconds=5),
5151
maximum_interval=timedelta(minutes=2),
@@ -56,11 +56,11 @@ async def run(self, community_info: dict) -> None:
5656
await workflow.execute_activity(
5757
load_data,
5858
args=[documents, community_id],
59-
start_to_close_timeout=timedelta(minutes=5),
59+
start_to_close_timeout=timedelta(minutes=60),
6060
retry_policy=RetryPolicy(
6161
initial_interval=timedelta(seconds=5),
6262
maximum_interval=timedelta(minutes=2),
63-
maximum_attempts=1,
63+
maximum_attempts=3,
6464
),
6565
)
6666

@@ -74,7 +74,7 @@ async def run(self, platform_id: str | None = None) -> None:
7474
communities = await workflow.execute_activity(
7575
get_communities,
7676
platform_id,
77-
start_to_close_timeout=timedelta(minutes=20),
77+
start_to_close_timeout=timedelta(minutes=5),
7878
retry_policy=RetryPolicy(
7979
maximum_attempts=3,
8080
),
@@ -86,7 +86,7 @@ async def run(self, platform_id: str | None = None) -> None:
8686
child_handle = await workflow.start_child_workflow(
8787
CommunityWebsiteWorkflow.run,
8888
args=[community],
89-
id=f"website-ingest-{community['community_id']}-{workflow.now().strftime('%Y%m%d%H%M')}",
89+
id=f"website:ingestor:{community['community_id']}",
9090
retry_policy=RetryPolicy(
9191
maximum_attempts=1,
9292
),

0 commit comments

Comments
 (0)