|
1 | | -""" |
2 | | -通过编辑这个脚本,可以自定义需要的后台任务 |
3 | | -""" |
4 | | -import schedule |
5 | | -import time |
6 | | -from topnews import pipeline |
7 | | -from loguru import logger |
8 | | -from utils.pb_api import PbTalker |
9 | | -import os |
10 | | -from utils.general_utils import get_logger_level |
11 | | -from datetime import datetime, timedelta |
12 | | -import pytz |
13 | | -import requests |
| 1 | +import asyncio |
| 2 | +from insights import pipeline, pb, logger |
14 | 3 |
|
| 4 | +counter = 0 |
15 | 5 |
|
16 | | -project_dir = os.environ.get("PROJECT_DIR", "") |
17 | | -if project_dir: |
18 | | - os.makedirs(project_dir, exist_ok=True) |
19 | | -logger_file = os.path.join(project_dir, 'tasks.log') |
20 | | -dsw_log = get_logger_level() |
21 | | -logger.add( |
22 | | - logger_file, |
23 | | - level=dsw_log, |
24 | | - backtrace=True, |
25 | | - diagnose=True, |
26 | | - rotation="50 MB" |
27 | | -) |
28 | 6 |
|
29 | | -pb = PbTalker(logger) |
30 | | -utc_now = datetime.now(pytz.utc) |
31 | | -# 减去一天得到前一天的UTC时间 |
32 | | -utc_yesterday = utc_now - timedelta(days=1) |
33 | | -utc_last = utc_yesterday.strftime("%Y-%m-%d %H:%M:%S") |
34 | | - |
35 | | - |
36 | | -def task(): |
37 | | - """ |
38 | | - global counter |
39 | | - sites = pb.read('sites', filter='activated=True') |
40 | | - urls = [] |
41 | | - for site in sites: |
42 | | - if not site['per_hours'] or not site['url']: |
43 | | - continue |
44 | | - if counter % site['per_hours'] == 0: |
45 | | - urls.append(site['url']) |
46 | | - logger.info(f'\033[0;32m task execute loop {counter}\033[0m') |
47 | | - logger.info(urls) |
48 | | - if urls: |
49 | | - sp(sites=urls) |
50 | | - else: |
51 | | - if counter % 24 == 0: |
52 | | - sp() |
53 | | - else: |
54 | | - print('\033[0;33mno work for this loop\033[0m') |
55 | | - counter += 1 |
56 | | - """ |
57 | | - global utc_last |
58 | | - logger.debug(f'last_collect_time: {utc_last}') |
59 | | - datas = pb.read(collection_name='insights', filter=f'updated>="{utc_last}"', fields=['id', 'content', 'tag', 'articles']) |
60 | | - logger.debug(f"got {len(datas)} items") |
61 | | - utc_last = datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S") |
62 | | - logger.debug(f'now_utc_time: {utc_last}') |
63 | | - |
64 | | - tags = pb.read(collection_name='tags', filter=f'activated=True') |
65 | | - tags_dict = {item["id"]: item["name"] for item in tags if item["name"]} |
66 | | - top_news = {} |
67 | | - for id, name in tags_dict.items(): |
68 | | - logger.debug(f'tag: {name}') |
69 | | - data = [item for item in datas if item['tag'] == id] |
70 | | - topnew = pipeline(data, logger) |
71 | | - if not topnew: |
72 | | - logger.debug(f'no top news for {name}') |
73 | | - continue |
74 | | - |
75 | | - top_news[id] = {} |
76 | | - for content, articles in topnew.items(): |
77 | | - content_urls = [pb.read('articles', filter=f'id="{a}"', fields=['url'])[0]['url'] for a in articles] |
78 | | - # 去除重叠内容 |
79 | | - # 如果发现重叠内容,哪个标签长就把对应的从哪个标签删除 |
80 | | - to_skip = False |
81 | | - for k, v in top_news.items(): |
82 | | - to_del_key = None |
83 | | - for c, u in v.items(): |
84 | | - if not set(content_urls).isdisjoint(set(u)): |
85 | | - if len(topnew) > len(v): |
86 | | - to_skip = True |
87 | | - else: |
88 | | - to_del_key = c |
89 | | - break |
90 | | - if to_del_key: |
91 | | - del top_news[k][to_del_key] |
92 | | - if to_skip: |
93 | | - break |
94 | | - if not to_skip: |
95 | | - top_news[id][content] = content_urls |
96 | | - |
97 | | - if not top_news[id]: |
98 | | - del top_news[id] |
99 | | - |
100 | | - if not top_news: |
101 | | - logger.info("no top news today") |
| 7 | +async def process_site(site, counter): |
| 8 | + if not site['per_hours'] or not site['url']: |
102 | 9 | return |
| 10 | + if counter % site['per_hours'] == 0: |
| 11 | + logger.info(f"applying {site['url']}") |
| 12 | + request_input = { |
| 13 | + "user_id": "schedule_tasks", |
| 14 | + "type": "site", |
| 15 | + "content": site['url'], |
| 16 | + "addition": f"task execute loop {counter + 1}" |
| 17 | + } |
| 18 | + await pipeline(request_input) |
| 19 | + |
| 20 | + |
| 21 | +async def schedule_pipeline(interval): |
| 22 | + global counter |
| 23 | + while True: |
| 24 | + sites = pb.read('sites', filter='activated=True') |
| 25 | + logger.info(f'task execute loop {counter + 1}') |
| 26 | + await asyncio.gather(*[process_site(site, counter) for site in sites]) |
103 | 27 |
|
104 | | - # 序列化为字符串 |
105 | | - top_news_text = {"#党建引领基层治理": [], |
106 | | - "#数字社区": [], |
107 | | - "#优秀活动案例": []} |
108 | | - |
109 | | - for id, v in top_news.items(): |
110 | | - # top_news[id] = {content: '\n\n'.join(urls) for content, urls in v.items()} |
111 | | - top_news[id] = {content: urls[0] for content, urls in v.items()} |
112 | | - if id == 's3kqj9ek8nvtthr': |
113 | | - top_news_text["#数字社区"].append("\n".join(f"{content}\n{urls}" for content, urls in top_news[id].items())) |
114 | | - elif id == 'qpcgotbqyz3a617': |
115 | | - top_news_text["#优秀活动案例"].append("\n".join(f"{content}\n{urls}" for content, urls in top_news[id].items())) |
116 | | - else: |
117 | | - top_news_text["#党建引领基层治理"].append("\n".join(f"{content}\n{urls}" for content, urls in top_news[id].items())) |
118 | | - |
119 | | - top_news_text = {k: "\n".join(v) for k, v in top_news_text.items()} |
120 | | - top_news_text = "\n\n".join(f"{k}\n{v}" for k, v in top_news_text.items()) |
121 | | - logger.info(top_news_text) |
122 | | - |
123 | | - data = { |
124 | | - "wxid": "R:10860349446619856", |
125 | | - "content": top_news_text |
126 | | - } |
127 | | - try: |
128 | | - response = requests.post("http://localhost:8088/api/sendtxtmsg", json=data) |
129 | | - if response.status_code == 200: |
130 | | - logger.info("send message to wechat success") |
131 | | - time.sleep(1) |
132 | | - data = { |
133 | | - "wxid": "R:10860349446619856", |
134 | | - "content": "[太阳] 今日份的临小助内参来啦!", |
135 | | - "atlist": ["@all"] |
136 | | - } |
137 | | - try: |
138 | | - response = requests.post("http://localhost:8088/api/sendtxtmsg", json=data) |
139 | | - if response.status_code == 200: |
140 | | - logger.info("send notify to wechat success") |
141 | | - except Exception as e: |
142 | | - logger.error(f"send notify to wechat failed: {e}") |
143 | | - except Exception as e: |
144 | | - logger.error(f"send message to wechat failed: {e}") |
| 28 | + counter += 1 |
| 29 | + logger.info(f'task execute loop finished, work after {interval} seconds') |
| 30 | + await asyncio.sleep(interval) |
145 | 31 |
|
146 | 32 |
|
147 | | -schedule.every().day.at("07:38").do(task) |
| 33 | +async def main(): |
| 34 | + interval_hours = 1 |
| 35 | + interval_seconds = interval_hours * 60 * 60 |
| 36 | + await schedule_pipeline(interval_seconds) |
148 | 37 |
|
149 | | -task() |
150 | | -while True: |
151 | | - schedule.run_pending() |
152 | | - time.sleep(60) |
| 38 | +asyncio.run(main()) |
0 commit comments