import base64 import hashlib import hmac import os import sqlite3 import sys import time from datetime import datetime, timedelta from urllib import parse from urllib.parse import quote from urllib.parse import urljoin import requests from bs4 import BeautifulSoup from loguru import logger # 获取环境变量 # 获取国际交流通知地址 IEC_ROOT_URL = os.getenv("IEC_ROOT_URL") if not IEC_ROOT_URL: logger.warning("Failed to read the link address from environment, fallback to default url.") IEC_ROOT_URL = "https://guoji.zstu.edu.cn/index/tzgg.htm" # 获取钉钉机器人相关配置 DING_TALK_BOT_SECRET = os.getenv("DING_TALK_BOT_SECRET") if not DING_TALK_BOT_SECRET: logger.warning("Failed to read the secret from environment, fallback to default secret.") DING_TALK_BOT_SECRET = "SEC047bc54ab057cfdb7238ced6daa1a7ba647452462befc2a74ee113b7fc724ea3" DING_TALK_BOT_ACCESS_TOKEN = os.getenv("DING_TALK_BOT_ACCESS_TOKEN") if not DING_TALK_BOT_ACCESS_TOKEN: logger.warning("Failed to read the access token from environment, fallback to default access token.") DING_TALK_BOT_ACCESS_TOKEN = "0e2caf719ff4df76b01fc167c568e3c28fc308633a72763522b6639d36f8ef4a" # 数据库清理配置 # 删除超过30天的通知 CLEANUP_DAYS = os.getenv("DB_CLEANUP_DAYS") if not CLEANUP_DAYS: logger.warning("Failed to read the database cleanup days from environment, fallback to default days") CLEANUP_DAYS = 30 # 每30天执行一次清理 CLEANUP_INTERVAL_DAYS = os.getenv("DB_CLEANUP_INTERVAL_DAYS") if not CLEANUP_INTERVAL_DAYS: logger.warning("Failed to read the database cleanup interval days from environment, fallback to default days") CLEANUP_INTERVAL_DAYS = 30 # 定义全局变量 # 访问请求头 REQUEST_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"} # 数据库存放位置 # DB_FILE_LOCATION = "/var/lib/notice/data/history.db" DB_FILE_LOCATION = "./data/history.db" # 钉钉机器人推送地址 DING_TALK_BOT_WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send" def fetch_content(func_url_header, func_url): logger.debug("Fetching content from {} with request headers {}".format(func_url_header, func_url_header)) raw_response = requests.get(func_url, headers=func_url_header) raw_response.encoding = "utf-8" if raw_response.status_code == 200 and len(raw_response.text) != 0: logger.success("Success in fetching content.") return raw_response.text else: logger.critical("Failed in fetching content, please check the url.") sys.exit(1) def resolve_iec_content(func_content): notices = [] soup = BeautifulSoup(func_content, "lxml") items = soup.select("div.sub_list > ul > li") logger.debug("Parsing content {}".format(items)) logger.info("Resolving {} elements.".format(len(items))) for item in items: a_tag = item.find("a") if not a_tag: continue title = a_tag.get_text(strip=True) href = a_tag.get("href") full_url = urljoin(IEC_ROOT_URL, href) if href else "" date_span = item.find("span") date = date_span.get_text(strip=True) if date_span else "" notices.append({"title": title, "url": full_url, "date": date}) logger.info("Parsed {} notices.".format(len(notices))) logger.success("Finished resolving International Exchange and Cooperation notices.") return notices def init_db(): connection = sqlite3.connect(DB_FILE_LOCATION) cursor = connection.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS iec_notice ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, url TEXT UNIQUE NOT NULL, date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS metadata ( key TEXT PRIMARY KEY NOT NULL, value TEXT ) ''') connection.commit() connection.close() logger.info("Database initialized.") def save_notices(notices): new_notices = [] if not notices: logger.warning("No notices to save.") return None connection = sqlite3.connect(DB_FILE_LOCATION) cursor = connection.cursor() insert_num = 0 for notice in notices: logger.debug("Saving notice {}".format(notice)) try: cursor.execute(''' INSERT INTO iec_notice (title, url, date) VALUES (?, ?, ?) ''', (notice["title"], notice["url"], notice["date"])) insert_num += 1 new_notices.append(notice) except sqlite3.IntegrityError: logger.debug("Duplicate notice found, skipping.") connection.commit() connection.close() logger.info("Saved {} notices successfully.", insert_num) return new_notices def get_today_notices(): connection = sqlite3.connect(DB_FILE_LOCATION) connection.row_factory = sqlite3.Row cursor = connection.cursor() cursor.execute(''' SELECT title, url FROM iec_notice WHERE date = date('now') ''') rows = cursor.fetchall() notices = [dict(row) for row in rows] connection.close() return notices def generate_url(func_secret): timestamp = str(round(time.time() * 1000)) string_to_sign = "{}\n{}".format(timestamp, func_secret) hmac_code = hmac.new(key=func_secret.encode("utf-8"), msg=string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode("utf-8") final_sign = quote(sign, safe='') params = {"access_token": DING_TALK_BOT_ACCESS_TOKEN, "timestamp": timestamp, "sign": final_sign} return f"{DING_TALK_BOT_WEBHOOK_URL}?{parse.urlencode(params)}" def send_dingtalk_msg(func_url, func_msg_type, func_content): msg_header = {"Content-Type": "application/json", } msg_body = {"msgtype": func_msg_type, "markdown": {"title": "新通知发布通知", "text": func_content}} response = requests.post(func_url, json=msg_body, headers=msg_header) response.encoding = "utf-8" if response.status_code != 200: logger.critical("Request failed, please check your internet connection.") sys.exit(1) result = response.json() if result.get("errcode") == 0: logger.success("DingTalk msg sent.") else: logger.critical("DingTalk msg failed, please check the error message.") sys.exit(1) def get_last_cleanup(): """从 metadata 表获取上次清理的时间(ISO 格式字符串)""" conn = sqlite3.connect(DB_FILE_LOCATION) c = conn.cursor() c.execute("SELECT value FROM metadata WHERE key = 'last_cleanup'") row = c.fetchone() conn.close() return row[0] if row else None def set_last_cleanup(): """更新 metadata 表中的上次清理时间为当前时间""" now = datetime.now().isoformat() conn = sqlite3.connect(DB_FILE_LOCATION) c = conn.cursor() c.execute(''' INSERT OR REPLACE INTO metadata (key, value) VALUES ('last_cleanup', ?) ''', (now,)) conn.commit() conn.close() def should_cleanup(): """判断是否需要执行清理(基于 CLEANUP_INTERVAL_DAYS)""" last = get_last_cleanup() if not last: return True # 从未清理过,应该清理 last_time = datetime.fromisoformat(last) delta = datetime.now() - last_time return delta.days >= CLEANUP_INTERVAL_DAYS def cleanup_old_notices(): """删除超过 CLEANUP_DAYS 的通知,并执行 VACUUM""" cutoff_date = (datetime.now() - timedelta(days=CLEANUP_DAYS)).date() cutoff_str = cutoff_date.isoformat() conn = sqlite3.connect(DB_FILE_LOCATION) cursor = conn.cursor() # 查询要删除的数量(仅日志) cursor.execute("SELECT COUNT(*) FROM iec_notice WHERE date < ?", (cutoff_str,)) count = cursor.fetchone()[0] if count == 0: logger.info("No notice to cleanup.") conn.close() return logger.info("Discovered {} notices that earlier than {}, prepare to delete them.", count, cutoff_str) # 执行删除 cursor.execute("DELETE FROM iec_notice WHERE date < ?", (cutoff_str,)) deleted = cursor.rowcount conn.commit() logger.success("{} notices have been deleted.", deleted) # 执行 VACUUM 回收空间 logger.info("Executing vacuum") cursor.execute("VACUUM") conn.commit() conn.close() logger.success("Vacuum done.") # 更新上次清理时间 set_last_cleanup() def main(): raw_content = fetch_content(REQUEST_HEADERS, IEC_ROOT_URL) notices = resolve_iec_content(raw_content) init_db() new_notices = save_notices(notices) if len(new_notices) > 0: content_lines = ["# 有新通知请注意查看!\n"] for notice in new_notices: line = f"- [{notice['title']}]({notice['url']})" content_lines.append(line) content = "\n".join(content_lines) send_dingtalk_msg(generate_url(DING_TALK_BOT_SECRET), "markdown", content) if should_cleanup(): logger.info("Database should be cleaned up.") cleanup_old_notices() else: logger.info("Skip database cleaned up.") if __name__ == "__main__": main()