Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 97 additions & 17 deletions app/messages/notification_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import base64
import hashlib
import hmac
import re
import smtplib
import time
from email.utils import parsedate_to_datetime
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
Expand All @@ -14,6 +18,45 @@
class NotificationService:
def __init__(self):
self.headers = {"Content-Type": "application/json"}
self._feishu_time_cache: dict[str, float] = {"timestamp": 0.0, "fetched_at": 0.0}

async def _get_feishu_timestamp(self) -> str:
"""Get Feishu-compatible timestamp with multi-source fallback and sanity checks."""
now = time.time()
cached_ts = self._feishu_time_cache.get("timestamp", 0.0)
fetched_at = self._feishu_time_cache.get("fetched_at", 0.0)
if cached_ts and (now - fetched_at) < 300:
return str(int(cached_ts))

time_sources = [
"https://open.feishu.cn/",
"https://www.feishu.cn/",
"https://www.cloudflare.com/",
]
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
for url in time_sources:
for method in ("HEAD", "GET"):
try:
resp = await client.request(method, url)
date_header = resp.headers.get("Date") or resp.headers.get("date")
if not date_header:
continue
dt = parsedate_to_datetime(date_header)
server_ts = dt.timestamp()
if abs(server_ts - now) > 3600:
continue
self._feishu_time_cache["timestamp"] = server_ts
self._feishu_time_cache["fetched_at"] = now
return str(int(server_ts))
except Exception:
continue
except Exception:
pass

self._feishu_time_cache["timestamp"] = now
self._feishu_time_cache["fetched_at"] = now
return str(int(now))

async def _async_post(self, url: str, json_data: dict[str, Any], proxy: str | None = None) -> dict[str, Any]:
try:
Expand All @@ -26,6 +69,60 @@ async def _async_post(self, url: str, json_data: dict[str, Any], proxy: str | No
logger.info(f"Push failed, push address: {url}, Error message: {e}")
return {"error": str(e)}

@staticmethod
def _build_feishu_payload(title: str, content: str, msg_type: str, language: str) -> dict[str, Any]:
if msg_type == "post":
lines = [line for line in content.splitlines() if line.strip()]
if not lines:
lines = [content]
post_content = [[{"tag": "text", "text": line}] for line in lines]
return {
"msg_type": "post",
"content": {"post": {language: {"title": title, "content": post_content}}},
}

return {"msg_type": "text", "content": {"text": content}}

@staticmethod
def _generate_feishu_sign(secret: str, timestamp: str) -> str:
key = f"{timestamp}\n{secret}".encode("utf-8")
msg = b""
digest = hmac.new(key, msg, hashlib.sha256).digest()
return base64.b64encode(digest).decode("utf-8")

async def send_to_feishu(
self,
url: str,
title: str,
content: str,
msg_type: str = "text",
sign_secret: str | None = None,
language: str = "zh_cn",
) -> dict[str, Any]:
results = {"success": [], "error": []}
sign_secret = sign_secret.strip() if sign_secret else None
timestamp: str | None = None
if sign_secret:
timestamp = await self._get_feishu_timestamp()
api_list = [u.strip() for u in url.replace(",", ",").split(",") if u.strip()]
for api in api_list:
payload = self._build_feishu_payload(title=title, content=content, msg_type=msg_type, language=language)
if sign_secret:
payload["timestamp"] = timestamp
payload["sign"] = self._generate_feishu_sign(secret=sign_secret, timestamp=timestamp)

resp = await self._async_post(api, payload)
if resp.get("StatusCode") == 0 or resp.get("code") == 0:
results["success"].append(api)
else:
results["error"].append(api)
if "error" in resp:
logger.info(f"Feishu push failed, push address: {api}, Failure message: {resp.get('error')}")
else:
logger.info(f"Feishu push failed, push address: {api}, Failure message: {resp}")

return results

async def send_to_dingtalk(
self, url: str, content: str, number: Optional[str] = None, is_atall: bool = False
) -> dict[str, list[str]]:
Expand Down Expand Up @@ -234,20 +331,3 @@ async def send_to_serverchan(
logger.info(f"ServerChan push failed, SCKEY/SendKey: {key}, Error message: {resp.get('message')}")

return results

async def send_to_feishu(
self, url: str, content: str
) -> dict[str, list[str]]:
results = {"success": [], "error": []}
api_list = [u.strip() for u in url.replace(",", ",").split(",") if u.strip()]
for api in api_list:
json_data = {
"msg_type": "text",
"content": {"text": content}
}
resp = await self._async_post(api, json_data)
if resp.get("msg") == 'success':
results["success"].append(api)
else:
results["error"].append(api)
return results
Loading