Spaces:
Sleeping
Sleeping
| """ | |
| 与 app.py 一致的票务排程 API(Token、影厅排片),供多页面复用,避免 import app 时执行整站 UI。 | |
| """ | |
| import json | |
| import os | |
| import time | |
| import pandas as pd | |
| import requests | |
| from dotenv import load_dotenv | |
| class _NoopStreamlit: | |
| def error(*args, **kwargs): | |
| return None | |
| def toast(*args, **kwargs): | |
| return None | |
| def cache_data(*args, **kwargs): | |
| def _decorator(func): | |
| return func | |
| return _decorator | |
| def _resolve_streamlit(): | |
| """ | |
| - Streamlit 页面内:保留原能力(toast/cache_data) | |
| - 非 Streamlit 运行(如 Flask/FastAPI):使用 no-op,避免无运行时警告 | |
| """ | |
| try: | |
| import streamlit as _st | |
| from streamlit.runtime.scriptrunner import get_script_run_ctx | |
| if get_script_run_ctx(suppress_warning=True) is None: | |
| return _NoopStreamlit() | |
| return _st | |
| except Exception: | |
| return _NoopStreamlit() | |
| st = _resolve_streamlit() | |
| load_dotenv() | |
| TOKEN_FILE = "token_data.json" | |
| CINEMA_ID = os.getenv("CINEMA_ID") | |
| def load_token(): | |
| if os.path.exists(TOKEN_FILE): | |
| try: | |
| with open(TOKEN_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, FileNotFoundError): | |
| return None | |
| return None | |
| def save_token(token_data): | |
| try: | |
| with open(TOKEN_FILE, "w", encoding="utf-8") as f: | |
| json.dump(token_data, f, ensure_ascii=False, indent=4) | |
| return True | |
| except Exception as e: | |
| st.error(f"保存Token失败: {e}") | |
| return False | |
| def login_and_get_token(): | |
| username = os.getenv("CINEMA_USERNAME") | |
| password = os.getenv("CINEMA_PASSWORD") | |
| res_code = os.getenv("CINEMA_RES_CODE") | |
| device_id = os.getenv("CINEMA_DEVICE_ID") | |
| if not all([username, password, res_code]): | |
| st.error("登录失败:未配置用户名、密码或影院编码环境变量。") | |
| return None | |
| session = requests.Session() | |
| session.headers.update({ | |
| "Host": "app.bi.piao51.cn", | |
| "Accept": "application/json, text/javascript, */*; q=0.01", | |
| "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148", | |
| }) | |
| login_url = "https://app.bi.piao51.cn/cinema-app/credential/login.action" | |
| login_headers = { | |
| "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", | |
| "Origin": "https://app.bi.piao51.cn", | |
| } | |
| login_data = { | |
| "username": username, | |
| "password": password, | |
| "type": "1", | |
| "resCode": res_code, | |
| "deviceid": device_id, | |
| "dtype": "ios", | |
| } | |
| try: | |
| response_login = session.post(login_url, headers=login_headers, data=login_data, allow_redirects=False, timeout=15) | |
| if not (300 <= response_login.status_code < 400 and "token" in session.cookies): | |
| st.error(f"登录步骤 1 失败,未能获取 Session Token。状态码: {response_login.status_code}") | |
| return None | |
| user_info_url = "https://app.bi.piao51.cn/cinema-app/security/logined.action" | |
| response_user_info = session.get(user_info_url, timeout=10) | |
| response_user_info.raise_for_status() | |
| user_info = response_user_info.json() | |
| if user_info.get("success") and user_info.get("data", {}).get("token"): | |
| token_data = user_info["data"] | |
| if save_token(token_data): | |
| st.toast("登录成功,已获取并保存新 Token!", icon="🔑") | |
| return token_data | |
| st.error(f"登录步骤 2 失败,未能从 JSON 中提取 Token。响应: {user_info.get('msg')}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"登录请求过程中发生网络错误: {e}") | |
| return None | |
| def fetch_hall_info(token): | |
| url = "https://cawapi.yinghezhong.com/showInfo/getShowHallInfo" | |
| params = {"token": token, "_": int(time.time() * 1000)} | |
| headers = {"Origin": "https://caw.yinghezhong.com", "User-Agent": "Mozilla/5.0"} | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("code") == 1 and data.get("data"): | |
| return {item["hallId"]: item["seatNum"] for item in data["data"]} | |
| raise Exception(f"获取影厅信息失败: {data.get('msg', '未知错误')}") | |
| def fetch_schedule_data(token, show_date): | |
| url = "https://cawapi.yinghezhong.com/showInfo/getHallShowInfo" | |
| params = {"showDate": show_date, "token": token, "_": int(time.time() * 1000)} | |
| headers = {"Origin": "https://caw.yinghezhong.com", "User-Agent": "Mozilla/5.0"} | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("code") == 1: | |
| return data.get("data", []) | |
| if data.get("code") == 500: | |
| raise ValueError("Token 可能已失效") | |
| raise Exception(f"获取排片数据失败: {data.get('msg', '未知错误')}") | |
| def get_api_data_with_token_management(show_date): | |
| token_data = load_token() | |
| token = token_data.get("token") if token_data else None | |
| if not token: | |
| token_data = login_and_get_token() | |
| if not token_data: | |
| return None, None | |
| token = token_data.get("token") | |
| try: | |
| schedule_list = fetch_schedule_data(token, show_date) | |
| hall_seat_map = fetch_hall_info(token) | |
| return schedule_list, hall_seat_map | |
| except ValueError: | |
| st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") | |
| token_data = login_and_get_token() | |
| if not token_data: | |
| return None, None | |
| token = token_data.get("token") | |
| try: | |
| schedule_list = fetch_schedule_data(token, show_date) | |
| hall_seat_map = fetch_hall_info(token) | |
| return schedule_list, hall_seat_map | |
| except Exception as e: | |
| st.error(f"重试获取数据失败: {e}") | |
| return None, None | |
| except Exception as e: | |
| st.error(f"获取 API 数据时发生错误: {e}") | |
| return None, None | |
| def fetch_canonical_movie_names(token, date_str): | |
| if not CINEMA_ID: | |
| return [] | |
| url = "https://app.bi.piao51.cn/cinema-app/mycinema/movieSellGross.action" | |
| params = { | |
| "token": token, | |
| "startDate": date_str, | |
| "endDate": date_str, | |
| "dateType": "day", | |
| "cinemaId": CINEMA_ID, | |
| } | |
| headers = { | |
| "Host": "app.bi.piao51.cn", | |
| "X-Requested-With": "XMLHttpRequest", | |
| "jwt": "0", | |
| "Accept": "application/json, text/javascript, */*; q=0.01", | |
| "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148", | |
| } | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("code") == "A00000" and data.get("results"): | |
| return [ | |
| item["movieName"] | |
| for item in data["results"] | |
| if item.get("movieName") and item["movieName"] != "总计" | |
| ] | |
| except Exception as e: | |
| print(f"获取标准电影名称失败: {e}") | |
| return [] | |
| def clean_movie_title(raw_title, canonical_names=None): | |
| if not isinstance(raw_title, str): | |
| return raw_title | |
| base_name = None | |
| if canonical_names: | |
| sorted_names = sorted(canonical_names, key=len, reverse=True) | |
| for name in sorted_names: | |
| if name in raw_title: | |
| base_name = name | |
| break | |
| if not base_name: | |
| base_name = raw_title.split(" ", 1)[0] | |
| raw_upper = raw_title.upper() | |
| suffix = "" | |
| if "HDR LED" in raw_upper: | |
| suffix = "(HDR LED)" | |
| elif "CINITY" in raw_upper: | |
| suffix = "(CINITY)" | |
| elif "杜比" in raw_upper or "DOLBY" in raw_upper: | |
| suffix = "(杜比视界)" | |
| elif "IMAX" in raw_upper: | |
| suffix = "(数字IMAX3D)" if "3D" in raw_upper else "(数字IMAX)" | |
| elif "巨幕" in raw_upper: | |
| suffix = "(中国巨幕立体)" if "立体" in raw_upper else "(中国巨幕)" | |
| elif "3D" in raw_upper: | |
| suffix = "(数字3D)" | |
| if suffix and suffix not in base_name: | |
| return f"{base_name}{suffix}" | |
| return base_name | |
| def get_valid_token(force_refresh=False): | |
| token_data = None if force_refresh else load_token() | |
| if not token_data: | |
| token_data = login_and_get_token() | |
| if not token_data: | |
| return None | |
| return token_data.get("token") | |
| def fetch_schedule_api_bundle(show_date): | |
| """ | |
| 一次性获取排程相关 API 原始数据: | |
| - getHallShowInfo(场次列表) | |
| - getShowHallInfo(影厅座位映射) | |
| - movieSellGross(标准影片名称) | |
| """ | |
| schedule_list, hall_seat_map = get_api_data_with_token_management(show_date) | |
| if schedule_list is None or hall_seat_map is None: | |
| return None | |
| token_data = load_token() | |
| token = token_data.get("token") if token_data else None | |
| canonical_names = fetch_canonical_movie_names(token, show_date) if token else [] | |
| return { | |
| "show_date": show_date, | |
| "token": token, | |
| "schedule_list": schedule_list, | |
| "hall_seat_map": hall_seat_map, | |
| "canonical_names": canonical_names, | |
| } | |
| def process_schedule_dataframe(schedule_list, hall_seat_map, canonical_names=None): | |
| """将排程 API 原始数据整理成便于展示的表格。""" | |
| if not schedule_list: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(schedule_list) | |
| if df.empty: | |
| return pd.DataFrame() | |
| df["座位数"] = df["hallId"].map(hall_seat_map or {}).fillna(0).astype(int) | |
| df.rename( | |
| columns={ | |
| "movieName": "影片名称", | |
| "showStartTime": "放映时间", | |
| "soldBoxOffice": "总收入", | |
| "soldTicketNum": "总人次", | |
| "hallName": "影厅名称", | |
| "showEndTime": "散场时间", | |
| }, | |
| inplace=True, | |
| ) | |
| if "影片名称" in df.columns: | |
| df["影片名称_清洗后"] = df["影片名称"].apply( | |
| lambda x: clean_movie_title(x, canonical_names) | |
| ) | |
| required_cols = [ | |
| "影片名称", | |
| "影片名称_清洗后", | |
| "放映时间", | |
| "散场时间", | |
| "影厅名称", | |
| "座位数", | |
| "总收入", | |
| "总人次", | |
| ] | |
| for col in required_cols: | |
| if col not in df.columns: | |
| df[col] = None | |
| df = df[required_cols] | |
| for col in ["座位数", "总收入", "总人次"]: | |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) | |
| return df | |