Spaces:
Sleeping
Sleeping
Update main.py
Browse files
main.py
CHANGED
|
@@ -1,36 +1,48 @@
|
|
| 1 |
import json, os
|
| 2 |
import gradio as gr
|
| 3 |
from fastapi.middleware.cors import CORSMiddleware
|
| 4 |
-
from fastapi import FastAPI, Request,
|
| 5 |
import google.generativeai as genai
|
| 6 |
import base64
|
| 7 |
from collections import defaultdict
|
| 8 |
from linebot import LineBotApi, WebhookHandler
|
| 9 |
from linebot.exceptions import InvalidSignatureError
|
| 10 |
-
from linebot.models import
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 11 |
import PIL.Image
|
| 12 |
import tempfile
|
|
|
|
|
|
|
| 13 |
|
| 14 |
# 設定 Google AI API 金鑰
|
| 15 |
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
|
| 16 |
|
| 17 |
# 設定生成文字的參數
|
| 18 |
-
generation_config = genai.types.GenerationConfig(
|
|
|
|
|
|
|
| 19 |
|
| 20 |
-
# 使用 Gemini
|
| 21 |
-
model = genai.GenerativeModel(
|
|
|
|
|
|
|
|
|
|
|
|
|
| 22 |
|
| 23 |
-
#
|
| 24 |
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
|
| 25 |
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
|
| 26 |
|
| 27 |
-
# 設定是否正在與使用者交談
|
| 28 |
-
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true"
|
| 29 |
-
|
| 30 |
# 建立 FastAPI 應用程式
|
| 31 |
app = FastAPI()
|
| 32 |
|
| 33 |
-
# 設定 CORS
|
| 34 |
app.add_middleware(
|
| 35 |
CORSMiddleware,
|
| 36 |
allow_origins=["*"],
|
|
@@ -39,48 +51,29 @@ app.add_middleware(
|
|
| 39 |
allow_headers=["*"],
|
| 40 |
)
|
| 41 |
|
| 42 |
-
# 處理根路徑請求
|
| 43 |
@app.get("/")
|
| 44 |
def root():
|
| 45 |
return {"title": "Line Bot"}
|
| 46 |
|
| 47 |
-
# 處理 Line Webhook 請求
|
| 48 |
@app.post("/webhook")
|
| 49 |
-
async def webhook(
|
| 50 |
-
request: Request,
|
| 51 |
-
background_tasks: BackgroundTasks,
|
| 52 |
-
x_line_signature=Header(None),
|
| 53 |
-
):
|
| 54 |
-
# 取得請求內容
|
| 55 |
body = await request.body()
|
| 56 |
try:
|
| 57 |
-
# 將處理 Line 事件的任務加入背景工作
|
| 58 |
background_tasks.add_task(
|
| 59 |
line_handler.handle, body.decode("utf-8"), x_line_signature
|
| 60 |
)
|
| 61 |
except InvalidSignatureError:
|
| 62 |
-
# 處理無效的簽章錯誤
|
| 63 |
raise HTTPException(status_code=400, detail="Invalid signature")
|
| 64 |
return "ok"
|
| 65 |
-
|
| 66 |
-
#==========================
|
| 67 |
-
# 使用者請求生成圖片
|
| 68 |
-
#==========================
|
| 69 |
-
def upload_image_to_imgur_with_token(image_path, access_token):
|
| 70 |
-
"""
|
| 71 |
-
使用 Imgur OAuth2 Access Token 上傳圖片
|
| 72 |
-
"""
|
| 73 |
-
try:
|
| 74 |
-
with open(image_path, "rb") as img_file:
|
| 75 |
-
image_binary = img_file.read()
|
| 76 |
|
|
|
|
|
|
|
|
|
|
| 77 |
headers = {"Authorization": f"Bearer {access_token}"}
|
| 78 |
-
|
| 79 |
data = {
|
| 80 |
"image": base64.b64encode(image_binary).decode("utf-8"),
|
| 81 |
"type": "base64",
|
| 82 |
}
|
| 83 |
-
|
| 84 |
response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data)
|
| 85 |
if response.status_code == 200:
|
| 86 |
return response.json()["data"]["link"]
|
|
@@ -91,10 +84,7 @@ def upload_image_to_imgur_with_token(image_path, access_token):
|
|
| 91 |
print("圖片上傳例外:", e)
|
| 92 |
return None
|
| 93 |
|
| 94 |
-
|
| 95 |
-
#==========================
|
| 96 |
-
# 使用者上傳圖片
|
| 97 |
-
#==========================
|
| 98 |
def get_image_url(message_id):
|
| 99 |
try:
|
| 100 |
message_content = line_bot_api.get_message_content(message_id)
|
|
@@ -107,48 +97,31 @@ def get_image_url(message_id):
|
|
| 107 |
print(f"Error getting image: {e}")
|
| 108 |
return None
|
| 109 |
|
| 110 |
-
#
|
| 111 |
-
|
| 112 |
-
def store_user_message(user_id, message_type, message_content):
|
| 113 |
-
"""
|
| 114 |
-
儲存用戶的訊息
|
| 115 |
-
"""
|
| 116 |
-
user_message_history[user_id].append({
|
| 117 |
-
"type": message_type,
|
| 118 |
-
"content": message_content})
|
| 119 |
-
|
| 120 |
-
def analyze_with_gemini(image_path, user_text):
|
| 121 |
-
"""
|
| 122 |
-
分析用戶問題和圖片,並返回 Gemini 的回應
|
| 123 |
-
"""
|
| 124 |
try:
|
| 125 |
-
# 確保圖片存在
|
| 126 |
if not os.path.exists(image_path):
|
| 127 |
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
| 128 |
-
|
| 129 |
organ = PIL.Image.open(image_path)
|
| 130 |
response = chat.send_message([user_text, organ])
|
| 131 |
-
|
| 132 |
-
# 提取回應內容
|
| 133 |
return response.text
|
| 134 |
-
|
| 135 |
except Exception as e:
|
| 136 |
return f"發生錯誤: {e}"
|
| 137 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 138 |
|
| 139 |
def get_previous_message(user_id):
|
| 140 |
-
"""
|
| 141 |
-
獲取用戶的上一則訊息
|
| 142 |
-
"""
|
| 143 |
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
|
| 144 |
-
# 返回最後一則訊息
|
| 145 |
return user_message_history[user_id][-1]
|
| 146 |
return None
|
| 147 |
|
| 148 |
-
|
| 149 |
-
# 主程式(圖片與文字)
|
| 150 |
-
#==========================
|
| 151 |
-
# 建立 chat_sessions 字典
|
| 152 |
chat_sessions = {}
|
| 153 |
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
|
| 154 |
def handle_image_message(event):
|
|
@@ -160,23 +133,24 @@ def handle_image_message(event):
|
|
| 160 |
|
| 161 |
if user_text and user_text.startswith("請幫我生成圖片"):
|
| 162 |
prompt = user_text.replace("請幫我生成圖片", "").strip()
|
| 163 |
-
|
| 164 |
image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation")
|
| 165 |
response = image_model.generate_content(prompt)
|
| 166 |
|
| 167 |
if response.parts and hasattr(response.parts[0], "inline_data"):
|
| 168 |
image_data = response.parts[0].inline_data.data
|
| 169 |
access_token = os.environ.get("IMGUR_ACCESS_TOKEN")
|
| 170 |
-
image_url = upload_image_to_imgur_with_token(image_data,access_token)
|
| 171 |
-
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
| 175 |
-
|
| 176 |
-
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
|
|
|
|
|
|
| 180 |
else:
|
| 181 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~"))
|
| 182 |
return
|
|
@@ -195,14 +169,7 @@ def handle_image_message(event):
|
|
| 195 |
image_path = previous_message["content"]
|
| 196 |
user_text = event.message.text
|
| 197 |
store_user_message(user_id, "text", user_text)
|
| 198 |
-
|
| 199 |
-
if not os.path.exists(image_path):
|
| 200 |
-
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
| 201 |
-
organ = PIL.Image.open(image_path)
|
| 202 |
-
completion = chat.send_message([user_text, organ])
|
| 203 |
-
out = completion.text
|
| 204 |
-
except Exception as e:
|
| 205 |
-
out = f"發生錯誤: {e}"
|
| 206 |
else:
|
| 207 |
if event.message.type != "text":
|
| 208 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
|
|
@@ -221,9 +188,8 @@ def handle_image_message(event):
|
|
| 221 |
|
| 222 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
|
| 223 |
|
| 224 |
-
|
| 225 |
if __name__ == "__main__":
|
| 226 |
-
# 啟動 FastAPI 應用程式
|
| 227 |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
|
| 228 |
|
| 229 |
# 註解說明:
|
|
|
|
| 1 |
import json, os
|
| 2 |
import gradio as gr
|
| 3 |
from fastapi.middleware.cors import CORSMiddleware
|
| 4 |
+
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
|
| 5 |
import google.generativeai as genai
|
| 6 |
import base64
|
| 7 |
from collections import defaultdict
|
| 8 |
from linebot import LineBotApi, WebhookHandler
|
| 9 |
from linebot.exceptions import InvalidSignatureError
|
| 10 |
+
from linebot.models import (
|
| 11 |
+
MessageEvent,
|
| 12 |
+
TextMessage,
|
| 13 |
+
TextSendMessage,
|
| 14 |
+
ImageSendMessage,
|
| 15 |
+
AudioMessage,
|
| 16 |
+
ImageMessage,
|
| 17 |
+
)
|
| 18 |
import PIL.Image
|
| 19 |
import tempfile
|
| 20 |
+
import httpx
|
| 21 |
+
import uvicorn
|
| 22 |
|
| 23 |
# 設定 Google AI API 金鑰
|
| 24 |
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
|
| 25 |
|
| 26 |
# 設定生成文字的參數
|
| 27 |
+
generation_config = genai.types.GenerationConfig(
|
| 28 |
+
max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16
|
| 29 |
+
)
|
| 30 |
|
| 31 |
+
# 使用 Gemini 模型
|
| 32 |
+
model = genai.GenerativeModel(
|
| 33 |
+
"gemini-2.0-flash-exp",
|
| 34 |
+
system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題",
|
| 35 |
+
generation_config=generation_config,
|
| 36 |
+
)
|
| 37 |
|
| 38 |
+
# 初始化 LINE Bot
|
| 39 |
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
|
| 40 |
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
|
| 41 |
|
|
|
|
|
|
|
|
|
|
| 42 |
# 建立 FastAPI 應用程式
|
| 43 |
app = FastAPI()
|
| 44 |
|
| 45 |
+
# 設定 CORS
|
| 46 |
app.add_middleware(
|
| 47 |
CORSMiddleware,
|
| 48 |
allow_origins=["*"],
|
|
|
|
| 51 |
allow_headers=["*"],
|
| 52 |
)
|
| 53 |
|
|
|
|
| 54 |
@app.get("/")
|
| 55 |
def root():
|
| 56 |
return {"title": "Line Bot"}
|
| 57 |
|
|
|
|
| 58 |
@app.post("/webhook")
|
| 59 |
+
async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 60 |
body = await request.body()
|
| 61 |
try:
|
|
|
|
| 62 |
background_tasks.add_task(
|
| 63 |
line_handler.handle, body.decode("utf-8"), x_line_signature
|
| 64 |
)
|
| 65 |
except InvalidSignatureError:
|
|
|
|
| 66 |
raise HTTPException(status_code=400, detail="Invalid signature")
|
| 67 |
return "ok"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 68 |
|
| 69 |
+
# Imgur 圖片上傳功能(接收 image_binary,不用暫存)
|
| 70 |
+
def upload_image_to_imgur_with_token(image_binary, access_token):
|
| 71 |
+
try:
|
| 72 |
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
| 73 |
data = {
|
| 74 |
"image": base64.b64encode(image_binary).decode("utf-8"),
|
| 75 |
"type": "base64",
|
| 76 |
}
|
|
|
|
| 77 |
response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data)
|
| 78 |
if response.status_code == 200:
|
| 79 |
return response.json()["data"]["link"]
|
|
|
|
| 84 |
print("圖片上傳例外:", e)
|
| 85 |
return None
|
| 86 |
|
| 87 |
+
# 接收使用者圖片
|
|
|
|
|
|
|
|
|
|
| 88 |
def get_image_url(message_id):
|
| 89 |
try:
|
| 90 |
message_content = line_bot_api.get_message_content(message_id)
|
|
|
|
| 97 |
print(f"Error getting image: {e}")
|
| 98 |
return None
|
| 99 |
|
| 100 |
+
# 分析用戶圖片+問題
|
| 101 |
+
def analyze_with_gemini(image_path, user_text, chat):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 102 |
try:
|
|
|
|
| 103 |
if not os.path.exists(image_path):
|
| 104 |
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
|
|
|
| 105 |
organ = PIL.Image.open(image_path)
|
| 106 |
response = chat.send_message([user_text, organ])
|
|
|
|
|
|
|
| 107 |
return response.text
|
|
|
|
| 108 |
except Exception as e:
|
| 109 |
return f"發生錯誤: {e}"
|
| 110 |
+
|
| 111 |
+
# 儲存使用者訊息
|
| 112 |
+
user_message_history = defaultdict(list)
|
| 113 |
+
def store_user_message(user_id, message_type, message_content):
|
| 114 |
+
user_message_history[user_id].append({
|
| 115 |
+
"type": message_type,
|
| 116 |
+
"content": message_content
|
| 117 |
+
})
|
| 118 |
|
| 119 |
def get_previous_message(user_id):
|
|
|
|
|
|
|
|
|
|
| 120 |
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
|
|
|
|
| 121 |
return user_message_history[user_id][-1]
|
| 122 |
return None
|
| 123 |
|
| 124 |
+
# 主訊息處理邏輯
|
|
|
|
|
|
|
|
|
|
| 125 |
chat_sessions = {}
|
| 126 |
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
|
| 127 |
def handle_image_message(event):
|
|
|
|
| 133 |
|
| 134 |
if user_text and user_text.startswith("請幫我生成圖片"):
|
| 135 |
prompt = user_text.replace("請幫我生成圖片", "").strip()
|
|
|
|
| 136 |
image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation")
|
| 137 |
response = image_model.generate_content(prompt)
|
| 138 |
|
| 139 |
if response.parts and hasattr(response.parts[0], "inline_data"):
|
| 140 |
image_data = response.parts[0].inline_data.data
|
| 141 |
access_token = os.environ.get("IMGUR_ACCESS_TOKEN")
|
| 142 |
+
image_url = upload_image_to_imgur_with_token(image_data, access_token)
|
| 143 |
+
|
| 144 |
+
if image_url:
|
| 145 |
+
line_bot_api.reply_message(
|
| 146 |
+
event.reply_token,
|
| 147 |
+
[
|
| 148 |
+
TextSendMessage(text="這是我為你生成的圖片喔~ ✨"),
|
| 149 |
+
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
|
| 150 |
+
]
|
| 151 |
+
)
|
| 152 |
+
else:
|
| 153 |
+
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片上傳失敗,請稍後再試~"))
|
| 154 |
else:
|
| 155 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~"))
|
| 156 |
return
|
|
|
|
| 169 |
image_path = previous_message["content"]
|
| 170 |
user_text = event.message.text
|
| 171 |
store_user_message(user_id, "text", user_text)
|
| 172 |
+
out = analyze_with_gemini(image_path, user_text, chat)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
else:
|
| 174 |
if event.message.type != "text":
|
| 175 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
|
|
|
|
| 188 |
|
| 189 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
|
| 190 |
|
| 191 |
+
# 啟動應用
|
| 192 |
if __name__ == "__main__":
|
|
|
|
| 193 |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
|
| 194 |
|
| 195 |
# 註解說明:
|