| |
| """ |
| 飞书图片预处理守护进程 (image_daemon.py) v3 — WebSocket 事件驱动 |
| 通过 lark-oapi SDK 的 WebSocket 长连接实时接收消息事件, |
| 检测图片消息后下载、上传到图床、回复 URL。 |
| """ |
| import os, sys, json, time, requests, threading, base64 |
|
|
| FEISHU_BASE = "https://open.feishu.cn/open-apis" |
| APP_ID = os.environ.get("FEISHU_APP_ID", "") |
| APP_SECRET = os.environ.get("FEISHU_APP_SECRET", "") |
|
|
| |
| API_BASE_URL = os.environ.get("API_BASE_URL", "https://asem12345-cliproxyapi.hf.space/v1") |
| API_KEY = os.environ.get("API_KEY", "") |
| MODEL_NAME = os.environ.get("MODEL_NAME", "gemini-3-flash") |
| BRAVE_API_KEY = os.environ.get("BRAVE_API_KEY", "") |
|
|
| |
| OPENCLAW_GATEWAY = "http://127.0.0.1:18789/v1" |
| _use_gateway = False |
| _soul_prompt = "" |
|
|
| |
| _chat_history = {} |
| MAX_HISTORY = 30 |
|
|
| |
| _pending_images = {} |
|
|
| |
| def log(msg): |
| ts = time.strftime("%H:%M:%S") |
| print(f"[image_daemon {ts}] {msg}", flush=True) |
|
|
| |
| _token = None |
| _token_time = 0 |
| _token_lock = threading.Lock() |
|
|
| def get_token(): |
| """获取 tenant_access_token,30分钟自动刷新""" |
| global _token, _token_time |
| with _token_lock: |
| if _token and time.time() - _token_time < 1800: |
| return _token |
| try: |
| resp = requests.post(f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal", |
| json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10) |
| data = resp.json() |
| if data.get("code") == 0: |
| _token = data["tenant_access_token"] |
| _token_time = time.time() |
| log("🔑 Token 已刷新") |
| return _token |
| log(f"❌ Token 获取失败: {data}") |
| except Exception as e: |
| log(f"❌ Token 异常: {e}") |
| return _token |
|
|
| |
| def download_image(token, message_id, file_key): |
| """通过消息资源 API 下载用户发送的图片""" |
| headers = {"Authorization": f"Bearer {token}"} |
| url = f"{FEISHU_BASE}/im/v1/messages/{message_id}/resources/{file_key}" |
| log(f"📥 API: GET {url}?type=image") |
| resp = requests.get(url, headers=headers, params={"type": "image"}, timeout=30) |
| if resp.status_code == 200 and len(resp.content) > 100: |
| log(f"✅ 下载成功: {len(resp.content)} bytes") |
| return resp.content |
| log(f"❌ 下载图片失败 {file_key}: HTTP {resp.status_code}, {resp.text[:200]}") |
| return None |
|
|
| |
| def upload_image(data): |
| """上传图片到图床,多重 fallback""" |
| |
| try: |
| resp = requests.post("https://tmpfiles.org/api/v1/upload", |
| files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| if resp.status_code == 200: |
| url = resp.json().get("data", {}).get("url", "") |
| if url: |
| result = url.replace("tmpfiles.org/", "tmpfiles.org/dl/") |
| log(f"📤 tmpfiles 成功") |
| return result |
| log(f"⚠️ tmpfiles 失败: HTTP {resp.status_code}") |
| except Exception as e: |
| log(f"⚠️ tmpfiles 异常: {e}") |
|
|
| |
| try: |
| resp = requests.post("https://telegra.ph/upload", |
| files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| if resp.status_code == 200: |
| result = resp.json() |
| if isinstance(result, list) and len(result) > 0: |
| src = result[0].get("src", "") |
| if src: |
| url = f"https://telegra.ph{src}" |
| log(f"📤 telegraph 成功") |
| return url |
| log(f"⚠️ telegraph 失败: HTTP {resp.status_code}") |
| except Exception as e: |
| log(f"⚠️ telegraph 异常: {e}") |
|
|
| |
| try: |
| resp = requests.post("https://file.io", |
| files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| if resp.status_code == 200: |
| url = resp.json().get("link", "") |
| if url: |
| log(f"📤 file.io 成功") |
| return url |
| log(f"⚠️ file.io 失败: HTTP {resp.status_code}") |
| except Exception as e: |
| log(f"⚠️ file.io 异常: {e}") |
|
|
| |
| try: |
| resp = requests.post("https://catbox.moe/user/api.php", |
| data={"reqtype": "fileupload"}, |
| files={"filedata": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| if resp.status_code == 200 and resp.text.startswith("http"): |
| log(f"📤 catbox 成功") |
| return resp.text.strip() |
| log(f"⚠️ catbox 失败: HTTP {resp.status_code}") |
| except Exception as e: |
| log(f"⚠️ catbox 异常: {e}") |
|
|
| |
| try: |
| resp = requests.post("https://0x0.st", |
| files={"file": ("img.jpg", data, "image/jpeg")}, timeout=30) |
| if resp.status_code == 200 and resp.text.startswith("http"): |
| log(f"📤 0x0 成功") |
| return resp.text.strip() |
| log(f"⚠️ 0x0 失败: HTTP {resp.status_code}") |
| except Exception as e: |
| log(f"⚠️ 0x0 异常: {e}") |
|
|
| return None |
|
|
| |
| def send_text(token, chat_id, text): |
| headers = { |
| "Authorization": f"Bearer {token}", |
| "Content-Type": "application/json; charset=utf-8" |
| } |
| |
| resp = requests.post(f"{FEISHU_BASE}/im/v1/messages", |
| headers=headers, |
| params={"receive_id_type": "chat_id"}, |
| json={ |
| "receive_id": chat_id, |
| "content": json.dumps({"text": text}), |
| "msg_type": "text" |
| }, timeout=10) |
| data = resp.json() |
| code = data.get("code", -1) |
| if code != 0: |
| log(f"❌ 发送消息失败 (code={code}): {data.get('msg', '')}") |
| return data |
|
|
| |
| def analyze_image_with_vision(img_data): |
| """将图片 base64 传给 LLM,由大师以人行佛教视角描述""" |
| if not API_KEY: |
| return None |
| try: |
| b64 = base64.b64encode(img_data).decode("utf-8") |
| soul = _soul_prompt or "You are a helpful assistant." |
| prompt = ( |
| "这位信徒发来了一张图片,请以你的风格阅览这张图片,它占据了你的视野。" |
| "简要阐述你的感悟,200字以内,不必报平就班。" |
| ) |
| payload = { |
| "model": MODEL_NAME, |
| "messages": [{ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
| ] |
| }], |
| "max_tokens": 300, |
| "stream": False |
| } |
| resp = requests.post( |
| f"{API_BASE_URL}/chat/completions", |
| headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| json=payload, timeout=30 |
| ) |
| if resp.status_code == 200: |
| reply = resp.json()["choices"][0]["message"]["content"] |
| log(f"📸 Vision 分析完成: {reply[:60]}...") |
| return reply |
| log(f"⚠️ Vision API 失败 ({resp.status_code}),跳过描述") |
| except Exception as e: |
| log(f"⚠️ Vision 异常: {e}") |
| return None |
|
|
| def analyze_image_with_request(b64, user_request): |
| """根据用户需求对缓存的图片做针对性分析""" |
| if not API_KEY: |
| return None |
| try: |
| soul = _soul_prompt or "You are a helpful assistant." |
| prompt = f"这位信徒发来了一张图片,并希望你能:{user_request}\n请以你的风格,基于这张图片的内容作出答复。" |
| resp = requests.post( |
| f"{API_BASE_URL}/chat/completions", |
| headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| json={ |
| "model": MODEL_NAME, |
| "messages": [{ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": soul + "\n\n---\n\n" + prompt}, |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}} |
| ] |
| }], |
| "stream": False |
| }, |
| timeout=30 |
| ) |
| if resp.status_code == 200: |
| reply = resp.json()["choices"][0]["message"]["content"] |
| log(f"📸 针对分析完成: {reply[:60]}...") |
| return reply |
| log(f"⚠️ 针对 Vision 失败 ({resp.status_code})") |
| except Exception as e: |
| log(f"⚠️ 针对 Vision 异常: {e}") |
| return None |
|
|
| |
| def handle_image_message(message_id, chat_id, image_key): |
| """下载 → Vision分析 → 上传 → 发送""" |
| token = get_token() |
| if not token: |
| log("❌ 无法获取 token,跳过") |
| return |
|
|
| log(f"🖼️ 处理图片 image_key={image_key[:20]}... (msg={message_id[:16]}...)") |
|
|
| |
| img_data = download_image(token, message_id, image_key) |
| if not img_data: |
| return |
|
|
| log(f"📥 {len(img_data)} bytes, 存储并问询需求...") |
|
|
| |
| b64 = base64.b64encode(img_data).decode("utf-8") |
| _pending_images[chat_id] = b64 |
|
|
| |
| soul = _soul_prompt or "" |
| ask_prompt = f"{soul}\n\n---\n\n这位信徒初次发来一张图片。请用一句话、中文回复,问对方希望你就这张图片做什么(例如:描述内容、翻译文字、分析意义等)。不要自行分析图片。" |
| if not API_KEY: |
| question = "幸会!你发来了一张图片。你希望老讲为你做什么呢?" |
| else: |
| try: |
| resp = requests.post( |
| f"{API_BASE_URL}/chat/completions", |
| headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, |
| json={"model": MODEL_NAME, "messages": [{"role": "user", "content": ask_prompt}], "stream": False}, |
| timeout=20 |
| ) |
| question = resp.json()["choices"][0]["message"]["content"] if resp.status_code == 200 else "你希望老讲就这张图进行什么分析呢?" |
| except Exception: |
| question = "你希望老讲就这张图进行什么分析呢?" |
| log(f"💬 问询用户需求: {question[:60]}") |
| result = send_text(token, chat_id, question) |
| log(f"📤 问询已发 (code={result.get('code', '?')})") |
| |
| history = _chat_history.get(chat_id, []) |
| _chat_history[chat_id] = (history + [ |
| {"role": "user", "content": "[user sent an image]"}, |
| {"role": "assistant", "content": question} |
| ])[-(MAX_HISTORY * 2):] |
|
|
| |
| def load_soul(): |
| """读取人设文件,去除图片处理指令(由 Daemon 接管)""" |
| global _soul_prompt |
| soul_path = "/root/.openclaw/workspace/SOUL.md" |
| try: |
| with open(soul_path, "r", encoding="utf-8") as f: |
| content = f.read() |
| |
| if "## 图片处理" in content: |
| content = content[:content.index("## 图片处理")].rstrip() |
| _soul_prompt = content |
| log(f"✅ SOUL.md 已加载 ({len(_soul_prompt)} 字)") |
| except FileNotFoundError: |
| _soul_prompt = "You are MoltBot, a helpful AI assistant." |
| log("⚠️ SOUL.md 未找到,使用默认人设") |
| except Exception as e: |
| _soul_prompt = "You are MoltBot, a helpful AI assistant." |
| log(f"⚠️ SOUL.md 加载失败: {e}") |
|
|
| |
| def check_openclaw_gateway(): |
| """探测本地 Gateway 是否可用""" |
| global _use_gateway |
| try: |
| resp = requests.post( |
| f"{OPENCLAW_GATEWAY}/chat/completions", |
| json={"model": "default", "messages": [{"role": "user", "content": "ping"}]}, |
| timeout=5 |
| ) |
| if resp.status_code in (200, 201): |
| _use_gateway = True |
| log(f"✅ OpenClaw Gateway 可用 ({OPENCLAW_GATEWAY})") |
| else: |
| log(f"⚠️ Gateway 响应 {resp.status_code},使用外部 LLM") |
| except Exception as e: |
| log(f"⚠️ Gateway 不可用 ({e}),使用外部 LLM + SOUL 人设") |
|
|
| |
| def search_brave(query, count=5): |
| """Brave Search API 一次搜索。返回整理得到的摘要字符串。""" |
| if not BRAVE_API_KEY: |
| return "搜索失败:BRAVE_API_KEY 未配置" |
| try: |
| resp = requests.get( |
| "https://api.search.brave.com/res/v1/web/search", |
| headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY}, |
| params={"q": query, "count": count, "text_decorations": False, "extra_snippets": True}, |
| timeout=10 |
| ) |
| if resp.status_code != 200: |
| return f"搜索失败 ({resp.status_code})" |
| results = resp.json().get("web", {}).get("results", []) |
| if not results: |
| return "未找到相关结果" |
| lines = [] |
| for r in results[:count]: |
| title = r.get("title", "无标题") |
| url = r.get("url", "") |
| desc = r.get("description") or (r.get("extra_snippets") or [""])[0] |
| lines.append(f"- **{title}**\n {desc}\n {url}") |
| log(f"🔍 Brave 搜索「{query}」得到 {len(results)} 条") |
| return "\n".join(lines) |
| except Exception as e: |
| log(f"⚠️ Brave 搜索异常: {e}") |
| return f"搜索异常: {e}" |
|
|
| |
| BRAVE_TOOL_DEF = [{ |
| "type": "function", |
| "function": { |
| "name": "brave_search", |
| "description": "当需要查找实时信息、最新数据、公司信息、行业报告等时,调用此工具进行网页搜索", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "query": { |
| "type": "string", |
| "description": "搜索关键词,英文搜索通常效果更好" |
| } |
| }, |
| "required": ["query"] |
| } |
| } |
| }] |
|
|
| |
| def chat_with_llm(user_text, history=None): |
| """带 Brave Search 工具的 ReAct 循环(最多搜 3 次)""" |
| try: |
| if _use_gateway: |
| resp = requests.post( |
| f"{OPENCLAW_GATEWAY}/chat/completions", |
| json={"model": "default", "messages": (history or []) + [{"role": "user", "content": user_text}], "stream": False}, |
| timeout=120 |
| ) |
| if resp.status_code == 200: |
| reply = resp.json()["choices"][0]["message"]["content"] |
| log(f"🤖 Gateway 回复: {reply[:60]}...") |
| return reply |
| log(f"⚠️ Gateway 失败 ({resp.status_code}),Fallback到外部 LLM") |
|
|
| if not API_KEY: |
| return "抱歉,我的大脑连接中断了 (API_KEY missing)" |
|
|
| import json as _json |
| soul = _soul_prompt or "You are a helpful assistant." |
| now_str = time.strftime("%Y-%m-%d %H:%M:%S %Z") |
| dated_soul = f"[当前系统时间: {now_str}]\n\n{soul}" |
| headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} |
| messages = [{"role": "system", "content": dated_soul}] + (history or []) + [{"role": "user", "content": user_text}] |
|
|
|
|
| for _ in range(31): |
| log(f"🤖 外部 LLM ({MODEL_NAME}): {user_text[:50]}...") |
| payload = { |
| "model": MODEL_NAME, |
| "messages": messages, |
| "stream": False |
| } |
| if BRAVE_API_KEY: |
| payload["tools"] = BRAVE_TOOL_DEF |
| resp = requests.post(f"{API_BASE_URL}/chat/completions", headers=headers, json=payload, timeout=60) |
| if resp.status_code != 200: |
| log(f"❌ 外部 LLM 错误 {resp.status_code}: {resp.text[:100]}") |
| return f"思考时遇到错误 ({resp.status_code})" |
|
|
| msg = resp.json()["choices"][0]["message"] |
| tool_calls = msg.get("tool_calls") or [] |
|
|
| if not tool_calls: |
| reply = msg.get("content", "") |
| log(f"🤖 LLM 回复: {reply[:60]}...") |
| return reply |
|
|
| |
| messages.append(msg) |
| for tc in tool_calls: |
| fn = tc["function"]["name"] |
| args = _json.loads(tc["function"]["arguments"]) |
| if fn == "brave_search": |
| result = search_brave(args.get("query", "")) |
| else: |
| result = f"未知工具: {fn}" |
| messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result}) |
|
|
| return "思考超时,请稍后再试" |
| except Exception as e: |
| log(f"❌ chat_with_llm 异常: {e}") |
| return f"大脑短路了: {e}" |
|
|
|
|
| |
| def handle_text_message(message_id, chat_id, text): |
| """LLM (带历史) -> 发送,如有待处理图片则做针对 Vision""" |
| token = get_token() |
| if not token: |
| return |
|
|
| |
| b64 = _pending_images.pop(chat_id, None) |
| if b64: |
| log(f"📸 检测到待处理图片,按需求分析: {text[:40]}") |
| reply = analyze_image_with_request(b64, text) |
| if not reply: |
| reply = chat_with_llm(text, _chat_history.get(chat_id, [])) |
| else: |
| history = _chat_history.get(chat_id, []) |
| reply = chat_with_llm(text, history) |
|
|
| if reply: |
| history = _chat_history.get(chat_id, []) |
| history = history + [ |
| {"role": "user", "content": text}, |
| {"role": "assistant", "content": reply} |
| ] |
| _chat_history[chat_id] = history[-(MAX_HISTORY * 2):] |
| send_text(token, chat_id, reply) |
|
|
| |
| def on_message_receive(data): |
| """im.message.receive_v1 事件回调""" |
| try: |
| event = data.event |
| message = event.message |
| sender = event.sender |
| |
| msg_type = message.message_type |
| msg_id = message.message_id |
| chat_id = message.chat_id |
| sender_type = getattr(sender, 'sender_type', '') if sender else '' |
|
|
| |
| if sender_type == "app": |
| return |
| |
| content_json = json.loads(message.content) |
|
|
| |
| log(f"📨 WebSocket 收到消息: type={msg_type}, msg_id={msg_id}") |
|
|
| |
| if msg_type == "image": |
| image_key = content_json.get("image_key", "") |
| if image_key: |
| t = threading.Thread(target=handle_image_message, args=(msg_id, chat_id, image_key)) |
| t.daemon = True |
| t.start() |
| return |
|
|
| |
| if msg_type == "text": |
| text = content_json.get("text", "") |
| if text: |
| t = threading.Thread(target=handle_text_message, args=(msg_id, chat_id, text)) |
| t.daemon = True |
| t.start() |
| return |
|
|
| except Exception as e: |
| log(f"❌ on_message_receive 异常: {type(e).__name__}: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| |
| def main(): |
| log("🚀 启动中... (WebSocket 事件驱动模式)") |
| log(f"📋 飞书 App ID: {APP_ID[:10]}..." if APP_ID else "❌ FEISHU_APP_ID 未设置!") |
|
|
| if not APP_ID or not APP_SECRET: |
| log("❌ FEISHU_APP_ID 或 FEISHU_APP_SECRET 未设置,退出") |
| sys.exit(1) |
|
|
| |
| load_soul() |
|
|
| |
| token = get_token() |
| if token: |
| log("✅ Token 获取成功") |
| else: |
| log("⚠️ Token 获取失败,稍后重试") |
|
|
| |
| def delayed_gateway_check(): |
| time.sleep(10) |
| check_openclaw_gateway() |
| threading.Thread(target=delayed_gateway_check, daemon=True).start() |
|
|
| |
| try: |
| import lark_oapi as lark |
| from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 |
| log("✅ lark-oapi SDK 已加载") |
| except ImportError: |
| log("❌ lark-oapi 未安装! 请执行: pip install lark-oapi") |
| sys.exit(1) |
|
|
| |
| handler = lark.EventDispatcherHandler.builder("", "") \ |
| .register_p2_im_message_receive_v1(on_message_receive) \ |
| .build() |
|
|
| |
| from lark_oapi.ws import Client as WsClient |
| ws_client = WsClient(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO) |
|
|
| log("🔌 正在连接飞书 WebSocket...") |
| log(" 订阅事件: im.message.receive_v1") |
| log(" 等待图片消息...") |
|
|
| |
| def heartbeat(): |
| count = 0 |
| while True: |
| time.sleep(60) |
| count += 1 |
| log(f"� WebSocket 运行中 ({count} 分钟)") |
| hb = threading.Thread(target=heartbeat, daemon=True) |
| hb.start() |
|
|
| |
| ws_client.start() |
|
|
| if __name__ == "__main__": |
| main() |
|
|