Files
get_notice/main.py

274 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import hashlib
import hmac
import os
import sqlite3
import sys
import time
from datetime import datetime, timedelta
from urllib import parse
from urllib.parse import quote
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from loguru import logger
# 获取环境变量
# 获取国际交流通知地址
IEC_ROOT_URL = os.getenv("IEC_ROOT_URL")
if not IEC_ROOT_URL:
logger.warning("Failed to read the link address from environment, fallback to default url.")
IEC_ROOT_URL = "https://guoji.zstu.edu.cn/index/tzgg.htm"
# 获取钉钉机器人相关配置
DING_TALK_BOT_SECRET = os.getenv("DING_TALK_BOT_SECRET")
if not DING_TALK_BOT_SECRET:
logger.warning("Failed to read the secret from environment, fallback to default secret.")
DING_TALK_BOT_SECRET = "SEC047bc54ab057cfdb7238ced6daa1a7ba647452462befc2a74ee113b7fc724ea3"
DING_TALK_BOT_ACCESS_TOKEN = os.getenv("DING_TALK_BOT_ACCESS_TOKEN")
if not DING_TALK_BOT_ACCESS_TOKEN:
logger.warning("Failed to read the access token from environment, fallback to default access token.")
DING_TALK_BOT_ACCESS_TOKEN = "0e2caf719ff4df76b01fc167c568e3c28fc308633a72763522b6639d36f8ef4a"
# 数据库清理配置
# 删除超过30天的通知
CLEANUP_DAYS = os.getenv("DB_CLEANUP_DAYS")
if not CLEANUP_DAYS:
logger.warning("Failed to read the database cleanup days from environment, fallback to default days")
CLEANUP_DAYS = 30
# 每30天执行一次清理
CLEANUP_INTERVAL_DAYS = os.getenv("DB_CLEANUP_INTERVAL_DAYS")
if not CLEANUP_INTERVAL_DAYS:
logger.warning("Failed to read the database cleanup interval days from environment, fallback to default days")
CLEANUP_INTERVAL_DAYS = 30
# 定义全局变量
# 访问请求头
REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
# 数据库存放位置
# DB_FILE_LOCATION = "/var/lib/notice/data/history.db"
DB_FILE_LOCATION = "./data/history.db"
# 钉钉机器人推送地址
DING_TALK_BOT_WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send"
def fetch_content(func_url_header, func_url):
logger.debug("Fetching content from {} with request headers {}".format(func_url_header, func_url_header))
raw_response = requests.get(func_url, headers=func_url_header)
raw_response.encoding = "utf-8"
if raw_response.status_code == 200 and len(raw_response.text) != 0:
logger.success("Success in fetching content.")
return raw_response.text
else:
logger.critical("Failed in fetching content, please check the url.")
sys.exit(1)
def resolve_iec_content(func_content):
notices = []
soup = BeautifulSoup(func_content, "lxml")
items = soup.select("div.sub_list > ul > li")
logger.debug("Parsing content {}".format(items))
logger.info("Resolving {} elements.".format(len(items)))
for item in items:
a_tag = item.find("a")
if not a_tag:
continue
title = a_tag.get_text(strip=True)
href = a_tag.get("href")
full_url = urljoin(IEC_ROOT_URL, href) if href else ""
date_span = item.find("span")
date = date_span.get_text(strip=True) if date_span else ""
notices.append({"title": title, "url": full_url, "date": date})
logger.info("Parsed {} notices.".format(len(notices)))
logger.success("Finished resolving International Exchange and Cooperation notices.")
return notices
def init_db():
connection = sqlite3.connect(DB_FILE_LOCATION)
cursor = connection.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS iec_notice
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
url TEXT UNIQUE NOT NULL,
date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS metadata
(
key TEXT PRIMARY KEY NOT NULL,
value TEXT
)
''')
connection.commit()
connection.close()
logger.info("Database initialized.")
def save_notices(notices):
new_notices = []
if not notices:
logger.warning("No notices to save.")
return None
connection = sqlite3.connect(DB_FILE_LOCATION)
cursor = connection.cursor()
insert_num = 0
for notice in notices:
logger.debug("Saving notice {}".format(notice))
try:
cursor.execute('''
INSERT INTO iec_notice (title, url, date)
VALUES (?, ?, ?)
''', (notice["title"], notice["url"], notice["date"]))
insert_num += 1
new_notices.append(notice)
except sqlite3.IntegrityError:
logger.debug("Duplicate notice found, skipping.")
connection.commit()
connection.close()
logger.info("Saved {} notices successfully.", insert_num)
return new_notices
def get_today_notices():
connection = sqlite3.connect(DB_FILE_LOCATION)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
cursor.execute('''
SELECT title, url
FROM iec_notice
WHERE date = date('now')
''')
rows = cursor.fetchall()
notices = [dict(row) for row in rows]
connection.close()
return notices
def generate_url(func_secret):
timestamp = str(round(time.time() * 1000))
string_to_sign = "{}\n{}".format(timestamp, func_secret)
hmac_code = hmac.new(key=func_secret.encode("utf-8"), msg=string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode("utf-8")
final_sign = quote(sign, safe='')
params = {"access_token": DING_TALK_BOT_ACCESS_TOKEN, "timestamp": timestamp, "sign": final_sign}
return f"{DING_TALK_BOT_WEBHOOK_URL}?{parse.urlencode(params)}"
def send_dingtalk_msg(func_url, func_msg_type, func_content):
msg_header = {"Content-Type": "application/json", }
msg_body = {"msgtype": func_msg_type, "markdown": {"title": "新通知发布通知", "text": func_content}}
response = requests.post(func_url, json=msg_body, headers=msg_header)
response.encoding = "utf-8"
if response.status_code != 200:
logger.critical("Request failed, please check your internet connection.")
sys.exit(1)
result = response.json()
if result.get("errcode") == 0:
logger.success("DingTalk msg sent.")
else:
logger.critical("DingTalk msg failed, please check the error message.")
sys.exit(1)
def get_last_cleanup():
"""从 metadata 表获取上次清理的时间ISO 格式字符串)"""
conn = sqlite3.connect(DB_FILE_LOCATION)
c = conn.cursor()
c.execute("SELECT value FROM metadata WHERE key = 'last_cleanup'")
row = c.fetchone()
conn.close()
return row[0] if row else None
def set_last_cleanup():
"""更新 metadata 表中的上次清理时间为当前时间"""
now = datetime.now().isoformat()
conn = sqlite3.connect(DB_FILE_LOCATION)
c = conn.cursor()
c.execute('''
INSERT OR REPLACE INTO metadata (key, value)
VALUES ('last_cleanup', ?)
''', (now,))
conn.commit()
conn.close()
def should_cleanup():
"""判断是否需要执行清理(基于 CLEANUP_INTERVAL_DAYS"""
last = get_last_cleanup()
if not last:
return True # 从未清理过,应该清理
last_time = datetime.fromisoformat(last)
delta = datetime.now() - last_time
return delta.days >= CLEANUP_INTERVAL_DAYS
def cleanup_old_notices():
"""删除超过 CLEANUP_DAYS 的通知,并执行 VACUUM"""
cutoff_date = (datetime.now() - timedelta(days=CLEANUP_DAYS)).date()
cutoff_str = cutoff_date.isoformat()
conn = sqlite3.connect(DB_FILE_LOCATION)
cursor = conn.cursor()
# 查询要删除的数量(仅日志)
cursor.execute("SELECT COUNT(*) FROM iec_notice WHERE date < ?", (cutoff_str,))
count = cursor.fetchone()[0]
if count == 0:
logger.info("No notice to cleanup.")
conn.close()
return
logger.info("Discovered {} notices that earlier than {}, prepare to delete them.", count, cutoff_str)
# 执行删除
cursor.execute("DELETE FROM iec_notice WHERE date < ?", (cutoff_str,))
deleted = cursor.rowcount
conn.commit()
logger.success("{} notices have been deleted.", deleted)
# 执行 VACUUM 回收空间
logger.info("Executing vacuum")
cursor.execute("VACUUM")
conn.commit()
conn.close()
logger.success("Vacuum done.")
# 更新上次清理时间
set_last_cleanup()
def main():
raw_content = fetch_content(REQUEST_HEADERS, IEC_ROOT_URL)
notices = resolve_iec_content(raw_content)
init_db()
new_notices = save_notices(notices)
if len(new_notices) > 0:
content_lines = ["# 有新通知请注意查看!\n"]
for notice in new_notices:
line = f"- [{notice['title']}]({notice['url']})"
content_lines.append(line)
content = "\n".join(content_lines)
send_dingtalk_msg(generate_url(DING_TALK_BOT_SECRET), "markdown", content)
if should_cleanup():
logger.info("Database should be cleaned up.")
cleanup_old_notices()
else:
logger.info("Skip database cleaned up.")
if __name__ == "__main__":
main()