1- from ..scrapers import *
2- from ..utils .general_utils import extract_urls , compare_phrase_with_list
1+ # -*- coding: utf-8 -*-
2+
3+ from scrapers import *
4+ from utils .general_utils import extract_urls , compare_phrase_with_list
35from .get_info import get_info , pb , project_dir , logger , info_rewrite
46import os
57import json
68from datetime import datetime , timedelta
79from urllib .parse import urlparse
810import re
9- import time
1011
1112
1213# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg
1819existing_urls = [url ['url' ] for url in pb .read (collection_name = 'articles' , fields = ['url' ]) if url ['url' ]]
1920
2021
21- def pipeline (_input : dict ):
22+ async def get_articles (urls : list [str ], expiration : datetime , cache : dict = {}) -> list [dict ]:
23+ articles = []
24+ for url in urls :
25+ logger .debug (f"fetching { url } " )
26+
27+ if url .startswith ('https://mp.weixin.qq.com' ) or url .startswith ('http://mp.weixin.qq.com' ):
28+ flag , result = await mp_crawler (url , logger )
29+ else :
30+ flag , result = await simple_crawler (url , logger )
31+
32+ if flag == - 7 :
33+ # -7 means cannot fetch the html, and other crawlers have no effect.
34+ continue
35+
36+ if flag != 11 :
37+ flag , result = await llm_crawler (url , logger )
38+ if flag != 11 :
39+ continue
40+
41+ expiration_date = expiration .strftime ('%Y-%m-%d' )
42+ article_date = int (result ['publish_time' ])
43+ if article_date < int (expiration_date .replace ('-' , '' )):
44+ logger .info (f"publish date is { article_date } , too old, skip" )
45+ continue
46+
47+ if url in cache :
48+ for k , v in cache [url ].items ():
49+ if v :
50+ result [k ] = v
51+
52+ articles .append (result )
53+
54+ return articles
55+
56+
57+ async def pipeline (_input : dict ):
2258 cache = {}
2359 source = _input ['user_id' ].split ('@' )[- 1 ]
2460 logger .debug (f"received new task, user: { source } , MsgSvrID: { _input ['addition' ]} " )
2561
62+ global existing_urls
63+ expiration_date = datetime .now () - timedelta (days = expiration_days )
64+
2665 if _input ['type' ] == 'publicMsg' :
2766 items = item_pattern .findall (_input ["content" ])
2867 # Iterate through all < item > content, extracting < url > and < summary >
@@ -37,73 +76,57 @@ def pipeline(_input: dict):
3776 cut_off_point = url .find ('chksm=' )
3877 if cut_off_point != - 1 :
3978 url = url [:cut_off_point - 1 ]
79+ if url in existing_urls :
80+ logger .debug (f"{ url } has been crawled, skip" )
81+ continue
4082 if url in cache :
4183 logger .debug (f"{ url } already find in item" )
4284 continue
4385 summary_match = summary_pattern .search (item )
4486 summary = summary_match .group (1 ) if summary_match else None
45- cache [url ] = summary
46- urls = list (cache .keys ())
87+ cache [url ] = { 'source' : source , 'abstract' : summary }
88+ articles = await get_articles ( list (cache .keys ()), expiration_date , cache )
4789
48- elif _input ['type' ] == 'text' :
90+ elif _input ['type' ] == 'site' :
91+ # for the site url, Usually an article list page or a website homepage
92+ # need to get the article list page
93+ # You can use a general scraper, or you can customize a site-specific crawler, see scrapers/README_CN.md
4994 urls = extract_urls (_input ['content' ])
5095 if not urls :
51- logger .debug (f"can not find any url in\n { _input ['content' ]} \n pass... " )
96+ logger .debug (f"can not find any url in\n { _input ['content' ]} " )
5297 return
53- elif _input ['type' ] == 'url' :
54- urls = []
55- pass
56- else :
57- return
58-
59- global existing_urls
60-
61- for url in urls :
62- if url in existing_urls :
63- logger .debug (f"{ url } has been crawled, skip" )
64- continue
65-
66- logger .debug (f"fetching { url } " )
67- if url .startswith ('https://mp.weixin.qq.com' ) or url .startswith ('http://mp.weixin.qq.com' ):
68- flag , article = mp_crawler (url , logger )
69- if flag == - 7 :
70- # For mp crawlers, the high probability of -7 is limited by WeChat, just wait 1min.
71- logger .info (f"fetch { url } failed, try to wait 1min and try again" )
72- time .sleep (60 )
73- flag , article = mp_crawler (url , logger )
74- else :
98+ articles = []
99+ for url in urls :
75100 parsed_url = urlparse (url )
76101 domain = parsed_url .netloc
77102 if domain in scraper_map :
78- flag , article = scraper_map [domain ](url , logger )
103+ result = scraper_map [domain ](url , logger )
79104 else :
80- flag , article = simple_crawler (url , logger )
105+ result = await general_scraper (url , expiration_date .date (), existing_urls , logger )
106+ articles .extend (result )
81107
82- if flag == - 7 :
83- # -7 means that the network is different, and other crawlers have no effect.
84- logger .info (f"cannot fetch { url } " )
85- continue
108+ elif _input ['type' ] == 'text' :
109+ urls = extract_urls (_input ['content' ])
110+ if not urls :
111+ logger .debug (f"can not find any url in\n { _input ['content' ]} \n pass..." )
112+ return
113+ articles = await get_articles (urls , expiration_date )
86114
87- if flag != 11 :
88- logger . info ( f" { url } failed with mp_crawler and simple_crawler" )
89- flag , article = llm_crawler ( url , logger )
90- if flag != 11 :
91- logger . info ( f" { url } failed with llm_crawler" )
92- continue
115+ elif _input [ 'type' ] == 'url' :
116+ # this is remained for wechat shared mp_article_card
117+ # todo will do it in project awada (need finish the generalMsg api first )
118+ articles = []
119+ else :
120+ return
93121
94- expiration_date = datetime . now () - timedelta ( days = expiration_days )
95- expiration_date = expiration_date . strftime ( '%Y-%m-%d' )
96- article_date = int ( article [ 'publish_time' ])
97- if article_date < int ( expiration_date . replace ( '-' , '' )):
98- logger .info (f"publish date is { article_date } , too old , skip" )
122+ for article in articles :
123+ if article [ 'url' ] in existing_urls :
124+ # For the case of entering multiple sites at the same time,
125+ # there is indeed a situation where duplicate articles are mixed into the same batch
126+ logger .debug (f"{ article [ 'url' ] } duplicated , skip" )
99127 continue
100128
101- article ['source' ] = source
102- if cache [url ]:
103- article ['abstract' ] = cache [url ]
104-
105129 insights = get_info (f"title: { article ['title' ]} \n \n content: { article ['content' ]} " )
106-
107130 try :
108131 article_id = pb .add (collection_name = 'articles' , body = article )
109132 except Exception as e :
@@ -112,7 +135,7 @@ def pipeline(_input: dict):
112135 json .dump (article , f , ensure_ascii = False , indent = 4 )
113136 continue
114137
115- existing_urls .append (url )
138+ existing_urls .append (article [ ' url' ] )
116139
117140 if not insights :
118141 continue
0 commit comments