grade_query / src /script.py
jzyg123's picture
Update src/script.py
6bb04a4 verified
# https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench?corpid=dingd16cf4422967594bf2c783f7214b6d69&dd_addcookie=true&from_login=success&login_host=ynsdfz.aliwork.com&login_pt=4-296-307-385&dtcode=e1bec99a908e374fa77d5e8a06109972&code_type=jsapi&dd_enable_replace=true&ddtab=true
import json
import requests
import os
import re
import subprocess
from datetime import datetime
from flask import Flask, render_template, request, jsonify
import socket
from xpinyin import Pinyin
app = Flask(__name__)
p = Pinyin()
STUDENTS_DATA = []
STUDENTS_FILE = "students.json"
DOT_NOTIFY_URL = "https://dot.mindreset.tech/api/open/text"
DOT_AUTH_TOKEN = "dot_app_XdfKhLlhOiyNWfSepKUytXlxwBfERNkiFZzYYkRrLOeXVjOIacBEjhqJDedXNKkw"
DOT_ICON = ""
def load_students_data():
global STUDENTS_DATA
try:
if os.path.exists(STUDENTS_FILE):
with open(STUDENTS_FILE, 'r', encoding='utf-8') as f:
STUDENTS_DATA = json.load(f)
print(f"✅ 成功加载学生数据,共 {len(STUDENTS_DATA)} 条记录")
else:
print(f"⚠️ 学生数据文件 {STUDENTS_FILE} 不存在")
STUDENTS_DATA = []
except Exception as e:
print(f"❌ 加载学生数据失败: {e}")
STUDENTS_DATA = []
class DingTalkFormQueryClient:
def __init__(self):
self.base_url = "https://ynsdfz.aliwork.com/dingtalk/web/APP_DKQG7TSUK6ZNU3RDFGVO/v1/form/searchFormDatas.json"
self.form_uuid = "FORM-72EF2444A3564C6D8020F1CDC36487EDHWE1"
self.cookies_file = "cookies.txt"
# 字段映射表
self.field_mapping = {
# 基本信息
'textField_mhsv4lgl': '姓名', # 新版表单
'radioField_ts1jbr0': '姓名', # 旧版表单
'textField_lqrf9fj': '学号',
'textField_sm64ysf': '考试名称',
'textField_mhsv4lgk': '身份证号', # 新版表单
'radioField_8a4voiy': '身份证号', # 旧版表单
# 语文
'textField_4ecclpd': '语文',
'textField_jqsfnel': '语文校次',
# 数学
'textField_djh9jbz': '数学',
'textField_sm6gl9b': '数学校次',
# 英语
'textField_s5awu09': '英语',
'textField_yrtoxm6': '英语校次',
# 物理
'textField_j7nxew8': '物理',
'textField_8iavn0r': '物理校次',
# 历史
'textField_7vh26my': '历史',
'textField_3nigx7n': '历史校次',
# 化学
'textField_d9v8hqi': '化学',
'textField_eecpqqn': '化学校次',
# 生物
'textField_6mk8ivr': '生物',
'textField_gjcqbzz': '生物校次',
# 政治
'textField_mqmwhoa': '政治',
'textField_q60j1vs': '政治校次',
# 地理
'textField_5iiw6pt': '地理',
'textField_xshus0b': '地理校次',
# 总分和排名
'textField_4bphzli': '总分',
'textField_y4uc1lx': '组合排名',
'textField_a2l7vuc': '大类排名',
}
# 查询身份证号时优先尝试的字段顺序(兼容新旧表单)
self.id_field_priority = [
'textField_mhsv4lgk', # 新版
'radioField_8a4voiy', # 旧版
]
# 考试排序列表(时间正序)
self.exam_order = [
"26届高一上中",
"26届高一上末",
"26届高一下中",
"26届高一下末",
"26届高二上中",
"26届高二上末",
"26届高二下中",
"26届高二下末",
"26届高二月考1",
"26届高三月考2",
"26届高三月考3",
"26届高三月考4"
]
def load_cookies_from_file(self):
"""从文件加载cookies"""
if not os.path.exists(self.cookies_file):
# 在Web应用中,打印到控制台可能不是最佳选择,但为了简单起见,暂时保留
print(f"❌ Cookie文件 {self.cookies_file} 不存在")
return None
try:
with open(self.cookies_file, 'r', encoding='utf-8') as f:
cookie_string = f.read().strip()
if not cookie_string:
print(f"❌ Cookie文件 {self.cookies_file} 为空")
return None
cookies = {}
for item in cookie_string.split(';'):
if '=' in item:
key, value = item.strip().split('=', 1)
cookies[key.strip()] = value.strip()
print(f"✅ 成功从 {self.cookies_file} 加载cookies") # 控制台日志
return cookies
except Exception as e:
print(f"❌ 读取cookie文件失败: {e}") # 控制台日志
return None
def extract_csrf_token(self, cookies):
"""从cookies中提取CSRF令牌"""
csrf_token = cookies.get('tianshu_csrf_token')
if not csrf_token:
print("❌ 在cookies中未找到tianshu_csrf_token") # 控制台日志
return None
return csrf_token
def validate_id_number(self, id_number):
"""验证身份证号格式"""
pattern = r'^\d{17}[\dXx]$'
if not re.match(pattern, id_number):
return False
return True
def build_request_url(self, id_number, csrf_token, search_field_id):
"""构建请求URL"""
if not search_field_id:
raise ValueError("search_field_id 不能为空")
search_json = json.dumps({search_field_id: id_number})
params = {
'formUuid': self.form_uuid,
'searchFieldJson': search_json,
'currentPage': 1,
'pageSize': 30,
'_csrf_token': csrf_token
}
param_string = '&'.join([f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()])
return f"{self.base_url}?{param_string}"
def query_scores(self, id_number):
"""查询成绩 - 修复版本"""
# 验证身份证号
if not self.validate_id_number(id_number):
return {"success": False, "error": "身份证号格式不正确,请输入18位身份证号"}
# 加载cookies
cookies = self.load_cookies_from_file()
if not cookies:
return {"success": False, "error": f"无法加载cookies,请检查服务器端的 {self.cookies_file} 文件"}
# 提取CSRF令牌
csrf_token = self.extract_csrf_token(cookies)
if not csrf_token:
return {"success": False, "error": "无法提取CSRF令牌,请检查cookie中的tianshu_csrf_token"}
headers = {
'Accept': 'application/json, text/json',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'Referer': 'https://ynsdfz.aliwork.com/APP_DKQG7TSUK6ZNU3RDFGVO/workbench',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest'
}
last_success_empty = None
for search_field in self.id_field_priority:
result = self._perform_query(
id_number=id_number,
cookies=cookies,
csrf_token=csrf_token,
headers=headers,
search_field=search_field
)
if not result.get("success", False):
# 直接返回错误信息
return result
if result.get("data"):
if search_field != self.id_field_priority[0]:
print(f"ℹ️ 通过备用字段 {search_field} 匹配到成绩数据")
return result
last_success_empty = result
# 所有字段都无数据,返回最后一次成功但无记录的结果或通用提示
if last_success_empty:
return last_success_empty
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
def _perform_query(self, id_number, cookies, csrf_token, headers, search_field):
"""执行实际的HTTP查询,并统一处理异常"""
try:
url = self.build_request_url(id_number, csrf_token, search_field)
except ValueError as e:
return {"success": False, "error": str(e)}
try:
print(f"🔍 正在使用字段 {search_field} 查询身份证号: {id_number}") # 控制台日志
response = requests.get(
url,
headers=headers,
cookies=cookies,
timeout=(10, 30)
)
print(f"📡 HTTP状态码: {response.status_code}") # 控制台日志
if response.status_code != 200:
return {"success": False, "error": f"HTTP请求失败,状态码: {response.status_code}"}
try:
data = response.json()
print(f"📦 收到响应数据") # 控制台日志
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}") # 控制台日志
return {"success": False, "error": "服务器返回的不是有效的JSON格式"}
parsed = self._parse_response(data)
if parsed.get("success") and not parsed.get("data"):
parsed.setdefault("message", "未找到相关成绩记录")
return parsed
except requests.exceptions.Timeout:
return {"success": False, "error": "请求超时,请检查网络连接"}
except requests.exceptions.ConnectionError:
return {"success": False, "error": "网络连接错误,请检查网络状态"}
except Exception as e:
return {"success": False, "error": f"请求失败: {e}"}
def _parse_response(self, data):
"""解析服务器响应 - 修复核心逻辑"""
try:
if not isinstance(data, dict):
return {"success": False, "error": "服务器返回的数据格式错误:不是字典类型"}
print(f"📋 响应字段: {list(data.keys())}") # 控制台日志
success = data.get('success')
if success is False:
error_msg = data.get('errorMsg', '未知API错误')
return {"success": False, "error": f"API返回错误: {error_msg}"}
if success is not True:
print(f"⚠️ success字段值: {success}") # 控制台日志
content = data.get('content')
if not content:
return {"success": False, "error": "响应中缺少content字段"}
if not isinstance(content, dict):
return {"success": False, "error": "content字段格式错误"}
records = content.get('data')
if records is None: # 可能表示没有数据,但请求本身是成功的
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
if not isinstance(records, list):
return {"success": False, "error": "data字段不是数组格式"}
if len(records) == 0:
return {"success": True, "data": [], "message": "未找到相关成绩记录"}
print(f"📊 找到 {len(records)} 条成绩记录") # 控制台日志
processed_data = self.process_score_data(records)
return {"success": True, "data": processed_data}
except Exception as e:
return {"success": False, "error": f"解析响应数据时出错: {e}"}
def process_score_data(self, records):
"""处理成绩数据"""
processed_records = []
for i, record in enumerate(records):
try:
# print(f"处理第 {i+1} 条记录...") # 控制台日志,可选择性保留
form_data = record.get('formData', {})
if not form_data:
# print(f"⚠️ 第 {i+1} 条记录缺少formData") # 控制台日志
continue
basic_info = {
'考试时间': self.timestamp_to_date(record.get('gmtCreate', 0)),
'标题': record.get('title', ''),
'表单ID': record.get('formInstId', '')
}
mapped_data = {}
for field_id, value in form_data.items():
field_name = self.field_mapping.get(field_id, field_id)
if value == "" or value == "-" or value is None:
mapped_data[field_name] = "无数据"
else:
mapped_data[field_name] = str(value)
processed_record = {**basic_info, **mapped_data}
processed_records.append(processed_record)
except Exception as e:
# print(f"⚠️ 处理第 {i+1} 条记录时出错: {e}") # 控制台日志
continue
# 对成绩记录进行排序
def get_sort_key(record):
exam_name = record.get('考试名称', '')
try:
return self.exam_order.index(exam_name)
except ValueError:
# 如果不在列表中,排在最后
return len(self.exam_order) + 1
processed_records.sort(key=get_sort_key)
# 为每条记录添加排序索引,方便前端排序
for i, record in enumerate(processed_records):
record['sort_index'] = i
# 计算每次考试的趋势(相比上一次)
for i in range(len(processed_records)):
current = processed_records[i]
if i == 0:
current['rank_trend'] = "无变化"
current['score_trend'] = "N/A"
else:
prev = processed_records[i-1]
# 计算排名趋势
try:
curr_rank_str = str(current.get('大类排名', '0'))
prev_rank_str = str(prev.get('大类排名', '0'))
# 提取数字
curr_rank_match = re.search(r'\d+', curr_rank_str)
prev_rank_match = re.search(r'\d+', prev_rank_str)
if curr_rank_match and prev_rank_match:
curr_rank = int(curr_rank_match.group())
prev_rank = int(prev_rank_match.group())
diff = prev_rank - curr_rank
if diff == 0:
current['rank_trend'] = "无变化"
elif diff > 0:
current['rank_trend'] = f"↑{diff}名"
else:
current['rank_trend'] = f"↓{abs(diff)}名"
else:
current['rank_trend'] = "无数据"
except Exception:
current['rank_trend'] = "计算错误"
# 计算总分趋势
try:
curr_score_str = str(current.get('总分', '0'))
prev_score_str = str(prev.get('总分', '0'))
# 提取数字(支持小数)
curr_score_match = re.search(r'\d+(\.\d+)?', curr_score_str)
prev_score_match = re.search(r'\d+(\.\d+)?', prev_score_str)
if curr_score_match and prev_score_match:
curr_score = float(curr_score_match.group())
prev_score = float(prev_score_match.group())
diff = curr_score - prev_score
if diff == 0:
current['score_trend'] = "无变化"
elif diff > 0:
current['score_trend'] = f"↑{diff:.1f}"
else:
current['score_trend'] = f"↓{abs(diff):.1f}"
else:
current['score_trend'] = "无数据"
except Exception:
current['score_trend'] = "计算错误"
# 反转列表,使最新的考试排在前面
processed_records.reverse()
return processed_records
def timestamp_to_date(self, timestamp):
"""时间戳转日期"""
try:
if timestamp and timestamp > 0:
dt = datetime.fromtimestamp(timestamp / 1000)
return dt.strftime('%Y-%m-%d %H:%M:%S')
return "未知时间"
except:
return "时间转换错误"
def format_score_report(self, data):
"""格式化成绩报告"""
if not data:
return "无成绩数据"
report = []
report.append("=" * 80)
report.append(f"学生成绩查询报告")
report.append("=" * 80)
for i, record in enumerate(data, 1):
report.append(f"\n📊 考试记录 {i}:")
report.append("-" * 50)
# 基本信息
report.append(f"考试名称: {record.get('考试名称', '未知')}")
report.append(f"考试时间: {record.get('考试时间', '未知')}")
report.append(f"学生姓名: {record.get('姓名', '未知')}")
report.append(f"学号: {record.get('学号', '未知')}")
# 成绩信息
report.append("\n📝 各科成绩:")
subjects = ['语文', '数学', '英语', '物理', '化学', '生物', '历史', '政治', '地理']
for subject in subjects:
score = record.get(subject, '无数据')
rank = record.get(f"{subject}校次", '无数据')
if score != '无数据' or rank != '无数据':
report.append(f" {subject}: {score} (校次: {rank})")
# 总分和排名
report.append("\n🏆 总分与排名:")
report.append(f" 总分: {record.get('总分', '无数据')}")
report.append(f" 大类排名: {record.get('大类排名', '无数据')}")
report.append(f" 组合排名: {record.get('组合排名', '无数据')}")
return "\n".join(report)
def compute_rank_trend(self, records):
"""计算相比上次考试的组合排名变化。返回字符串如 '↓3名' 或 '↑2名' 或 '无变化'。"""
try:
# records 按时间倒序排列(最新的在最前面)
if not records or len(records) < 2:
return "无变化"
# 提取最近两次的组合排名字段并尝试解析为整数
def parse_rank(rec):
v = rec.get('大类排名') or rec.get('大类排名'.strip())
if not v:
return None
# 移除非数字字符
s = re.sub(r"[^0-9]", "", str(v))
return int(s) if s.isdigit() else None
latest = parse_rank(records[0]) # 最新的在第一个
prev = parse_rank(records[1]) # 上一次的在第二个
if latest is None or prev is None:
return "无数据"
diff = prev - latest
if diff == 0:
return "无变化"
if diff > 0:
return f"↑{diff}名"
return f"↓{abs(diff)}名"
except Exception:
return "计算错误"
def _get_local_ip(self):
"""获取本机局域网IP地址"""
try:
import subprocess
# 使用ifconfig获取所有IP地址
result = subprocess.run(['ifconfig'], capture_output=True, text=True)
if result.returncode == 0:
ips = []
lines = result.stdout.split('\n')
for line in lines:
if 'inet ' in line and '127.0.0.1' not in line and 'inet 169.254.' not in line:
parts = line.strip().split()
for i, part in enumerate(parts):
if part == 'inet' and i + 1 < len(parts):
ip = parts[i + 1]
ips.append(ip)
# 优先级:192.168.x.x > 10.x.x.x > 172.x.x.x > 其他
for ip in ips:
if ip.startswith('192.168.'):
return ip
for ip in ips:
if ip.startswith('10.'):
return ip
for ip in ips:
if ip.startswith('172.'):
return ip
if ips:
return ips[0]
except Exception:
# 备用方法
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
pass
return "127.0.0.1"
def send_dot_notification(self, title, message, signature, icon=None):
"""发送到 dot.mindreset.tech 的通知请求。"""
try:
url = DOT_NOTIFY_URL
headers = {
"Authorization": f"Bearer {DOT_AUTH_TOKEN}",
"Content-Type": "application/json"
}
# 获取本机局域网IP
local_ip = self._get_local_ip()
data = {
"refreshNow": False,
"deviceId": "E4B063CC56DC",
"title": title,
"message": message,
"signature": signature,
"icon": icon or DOT_ICON,
"link": f"https://{local_ip}:1111"
}
resp = requests.post(url, json=data, headers=headers, timeout=10)
try:
return resp.json()
except Exception:
return {"status": "non-json-response", "code": resp.status_code}
except Exception as e:
return {"error": str(e)}
# 全局客户端实例
client = DingTalkFormQueryClient()
# 应用启动时加载学生数据
load_students_data()
@app.route('/', methods=['GET'])
def index():
return render_template('index.html', result=None)
@app.route('/search_student', methods=['POST'])
def search_student():
"""根据姓名查找学生"""
name = request.form.get('name', '').strip()
if not name:
return jsonify({"success": False, "message": "请输入学生姓名"})
if not STUDENTS_DATA:
return jsonify({"success": False, "message": "学生数据未加载,请检查students.json文件"})
# 模糊匹配学生姓名(支持拼音)
matched_students = []
name_lower = name.lower()
for student in STUDENTS_DATA:
student_name = student.get("学生姓名", "")
if not student_name:
continue
# 1. 直接包含匹配
if name in student_name:
match_type = "name"
# 2. 拼音匹配
else:
# 获取全拼 (e.g. "zhouyangguang")
pinyin_full = p.get_pinyin(student_name, '').lower()
# 获取首字母 (e.g. "zyg")
pinyin_initials = p.get_initials(student_name, '').lower()
if name_lower in pinyin_full or name_lower in pinyin_initials:
match_type = "pinyin"
else:
match_type = None
if match_type:
# 直接显示完整身份证号,不进行掩码处理
id_number = student.get("学号", "")
matched_students.append({
"姓名": student_name,
"身份证号": id_number,
"显示身份证号": id_number,
"is_self": False
})
if not matched_students:
return jsonify({"success": False, "message": "未找到匹配的学生"})
# 添加调试信息
print(f"🔍 搜索姓名: {name}")
print(f"📋 找到 {len(matched_students)} 个匹配的学生")
return jsonify({"success": True, "students": matched_students})
@app.route('/compare')
def compare_page():
"""成绩比较页面"""
return render_template('compare.html')
@app.route('/api/get_student_scores', methods=['POST'])
def get_student_scores():
"""获取学生成绩数据API"""
id_number = request.form.get('id_number')
if not id_number:
return jsonify({"success": False, "error": "身份证号不能为空"})
result = client.query_scores(id_number)
return jsonify(result)
@app.route('/query', methods=['POST'])
def query():
id_number = request.form.get('id_number')
result = None
if not id_number:
result = {"success": False, "error": "身份证号不能为空"}
else:
result = client.query_scores(id_number)
# 如果返回成功且检测到周洋光,则计算趋势并发送通知
try:
# 查询学生姓名是否为周洋光:在 students.json 中查找
student_name = None
for s in STUDENTS_DATA:
if s.get('学号') == id_number:
student_name = s.get('学生姓名')
break
if student_name == '周洋光' and result.get('success') and result.get('data'):
print("✅ 检测到周洋光,准备发送通知...")
trend = client.compute_rank_trend(result.get('data'))
# 取最新考试名称(列表已按倒序排列,取第一个)
latest_exam = result.get('data')[0].get('考试名称', '') if result.get('data') else ''
title = f"本次考试排名: {result.get('data')[0].get('大类排名','最新排名')}名"
message = f"相比上次考试趋势: {trend}"
signature = latest_exam
send_resp = client.send_dot_notification(title, message, signature)
print(f"通知发送结果: {send_resp}")
# 将趋势返回给客户端
result['trend'] = trend
except Exception as e:
print(f"发送通知或计算趋势时出错: {e}")
# 准备传递给模板的数据
template_data = {
'id_number_submitted': id_number,
'success': result.get('success'),
'error_message': result.get('error'),
'message': result.get('message'),
'scores_data': result.get('data'),
'trend': result.get('trend') # 添加趋势数据传递给模板
}
# 传递 self_name,用于模板判断显示“本人”徽章
self_name = None
for s in STUDENTS_DATA:
if s.get('学号') == id_number:
self_name = s.get('学生姓名')
break
return render_template('index.html', result=template_data, self_name=self_name)
def main():
"""主函数 - 改为启动Flask服务器"""
# 确保 templates 文件夹存在
if not os.path.exists("templates"):
os.makedirs("templates")
print("创建 'templates' 文件夹。请将 'index.html' 文件放入其中。")
# 提示信息
print("=" * 60)
print("钉钉表单成绩查询工具 - Web版")
print("=" * 60)
print("请在浏览器中打开 http://127.0.0.1:1111/ 进行访问")
print("注意:")
print(f"1. 请确保在程序目录下有 {client.cookies_file} 文件且内容正确")
print(f"2. 请确保在程序目录下有 {STUDENTS_FILE} 文件包含学生信息")
print("3. 服务器日志会显示在此控制台")
print("=" * 60)
app.run(host='0.0.0.0', port=1111, debug=True)
if __name__ == "__main__":
main()