Spaces:
Sleeping
Sleeping
| # ''' | |
| # Author: mdhuang555 67590178+mdhuang555@users.noreply.github.com | |
| # Date: 2025-03-30 15:57:22 | |
| # LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com | |
| # LastEditTime: 2025-04-03 11:32:30 | |
| # FilePath: \Notyif\dataBaseConnecter.py | |
| # Description: 数据库连接器,支持SSL连接 | |
| # ''' | |
| import socket | |
| import json | |
| import mysql.connector | |
| from typing import Dict, Any, Optional | |
| import yaml | |
| from pathlib import Path | |
| class DatabaseConnector: | |
| def __init__(self, port: int = 3306): # Removed host parameter | |
| # self.host is now determined after loading config | |
| self.port = port | |
| self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.config = self._load_config() | |
| # Set server host from config or default to '0.0.0.0' for broader accessibility | |
| self.host = self.config.get('server', {}).get('host', '0.0.0.0') | |
| def _load_config(self) -> Dict[str, Any]: | |
| """加载配置文件""" | |
| try: | |
| config_path = Path(__file__).parent / "Notify_config.yaml" # Load from Notify_config.yaml | |
| with open(config_path, "r", encoding="utf-8") as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| print(f"加载配置文件错误: {e}") | |
| return {} | |
| def connect_db(self) -> Optional[mysql.connector.MySQLConnection]: | |
| """连接到MySQL数据库,使用SSL连接""" | |
| try: | |
| # 获取SSL证书路径 | |
| current_dir = Path(__file__).parent.absolute() | |
| ssl_ca_path = current_dir / "DigiCertGlobalRootCA.crt.pem" | |
| # 确保SSL证书文件存在 | |
| if not ssl_ca_path.exists(): | |
| raise FileNotFoundError(f"SSL证书文件未找到: {ssl_ca_path}") | |
| # 建立数据库连接 | |
| conn = mysql.connector.connect( | |
| host=self.config["mysql"]["host"], | |
| port=self.config["mysql"].get("port", 3306), | |
| user=self.config["mysql"]["user"], | |
| password=self.config["mysql"]["password"], | |
| database=self.config["mysql"]["database"], | |
| ssl_ca=str(ssl_ca_path), | |
| ssl_disabled=False, | |
| charset='utf8mb4', | |
| collation='utf8mb4_unicode_ci' | |
| ) | |
| return conn | |
| except Exception as e: | |
| print(f"数据库连接错误: {e}") | |
| return None | |
| def extract_text(self, conn: mysql.connector.MySQLConnection, table: str, column: str) -> list: | |
| """从指定表格和列中提取文本""" | |
| try: | |
| cursor = conn.cursor(dictionary=True) | |
| # 如果请求所有列,则获取完整的行数据 | |
| if column == '*': | |
| query = f"SELECT * FROM {table}" | |
| else: | |
| query = f"SELECT {column} FROM {table}" | |
| cursor.execute(query) | |
| results = cursor.fetchall() | |
| cursor.close() | |
| return results | |
| except Exception as e: | |
| print(f"提取文本错误: {e}") | |
| return [] | |
| def start_server(self): | |
| """启动服务器监听请求""" | |
| self.server_socket.bind((self.host, self.port)) | |
| self.server_socket.listen(5) | |
| print(f"服务器启动在 {self.host}:{self.port}") | |
| while True: | |
| try: | |
| client_socket, address = self.server_socket.accept() | |
| print(f"接受来自 {address} 的连接") | |
| # 接收客户端请求 | |
| data = client_socket.recv(1024).decode('utf-8') | |
| request = json.loads(data) | |
| # 处理请求 | |
| table = request.get('table') | |
| column = request.get('column') | |
| # 连接数据库并提取文本 | |
| conn = self.connect_db() | |
| if conn: | |
| try: | |
| results = self.extract_text(conn, table, column) | |
| response = {'status': 'success', 'data': results} | |
| except Exception as e: | |
| response = {'status': 'error', 'message': str(e)} | |
| finally: | |
| conn.close() | |
| else: | |
| response = {'status': 'error', 'message': '数据库连接失败'} | |
| # 发送响应 | |
| response_data = json.dumps(response, ensure_ascii=False) | |
| response_bytes = response_data.encode('utf-8') | |
| # 先发送数据长度 | |
| length_prefix = len(response_bytes).to_bytes(4, byteorder='big') | |
| client_socket.send(length_prefix) | |
| # 再发送实际数据 | |
| client_socket.send(response_bytes) | |
| client_socket.close() | |
| except Exception as e: | |
| print(f"处理请求错误: {e}") | |
| continue | |
| if __name__ == "__main__": | |
| server = DatabaseConnector() | |
| server.start_server() |