video_bot_999 / app.py
youngtsai's picture
import io
3310ad5
raw
history blame
No virus
16.1 kB
import gradio as gr
import pandas as pd
import requests
from bs4 import BeautifulSoup
from docx import Document
import os
from openai import OpenAI
import json
from youtube_transcript_api import YouTubeTranscriptApi
from moviepy.editor import VideoFileClip
from pytube import YouTube
import os
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
import io
from urllib.parse import urlparse, parse_qs
# 假设您的环境变量或Secret的名称是GOOGLE_APPLICATION_CREDENTIALS_JSON
# credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
# credentials_dict = json.loads(credentials_json_string)
# SCOPES = ['https://www.googleapis.com/auth/drive']
# credentials = service_account.Credentials.from_service_account_info(
# credentials_dict, scopes=SCOPES)
# service = build('drive', 'v3', credentials=credentials)
# # 列出 Google Drive 上的前10個文件
# results = service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
# items = results.get('files', [])
# if not items:
# print('No files found.')
# else:
# print("=====Google Drive 上的前10個文件=====")
# print('Files:')
# for item in items:
# print(u'{0} ({1})'.format(item['name'], item['id']))
OUTPUT_PATH = 'videos'
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY")
client = OpenAI(api_key=OPEN_AI_KEY)
# 初始化Google Drive服务
def init_drive_service():
credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
credentials_dict = json.loads(credentials_json_string)
SCOPES = ['https://www.googleapis.com/auth/drive']
credentials = service_account.Credentials.from_service_account_info(
credentials_dict, scopes=SCOPES)
service = build('drive', 'v3', credentials=credentials)
return service
def create_folder_if_not_exists(service, folder_name, parent_id):
print("检查是否存在特定名称的文件夹,如果不存在则创建")
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false"
response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute()
folders = response.get('files', [])
if not folders:
# 文件夹不存在,创建新文件夹
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id]
}
folder = service.files().create(body=file_metadata, fields='id').execute()
return folder.get('id')
else:
# 文件夹已存在
return folders[0]['id']
# 检查Google Drive上是否存在文件
def check_file_exists(service, folder_name, file_name):
query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false"
response = service.files().list(q=query).execute()
files = response.get('files', [])
return len(files) > 0, files[0]['id'] if files else None
# 上传文件到Google Drive
def upload_to_drive(service, file_name, folder_id, content):
file_metadata = {
'name': file_name,
'parents': [folder_id]
}
media = MediaIoBaseUpload(io.BytesIO(content.encode()), mimetype='text/plain')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
return file.get('id')
def upload_to_drive(service, file_name, folder_id, content):
print("上传文本内容到Google Drive指定的文件夹中")
# 如果您的内容是字符串(文本),请使用io.StringIO
# 对于二进制内容,请使用io.BytesIO
file_metadata = {'name': file_name, 'parents': [folder_id]}
# 这里我们假定content是文本,因此使用io.StringIO
media = MediaFileUpload(io.StringIO(content), mimetype='text/plain')
service.files().create(body=file_metadata, media_body=media, fields='id').execute()
def process_file(file):
# 读取文件
if file.name.endswith('.csv'):
df = pd.read_csv(file)
text = df_to_text(df)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file)
text = df_to_text(df)
elif file.name.endswith('.docx'):
text = docx_to_text(file)
else:
raise ValueError("Unsupported file type")
df_string = df.to_string()
# 宜蘭:移除@XX@符号 to |
df_string = df_string.replace("@XX@", "|")
# 根据上传的文件内容生成问题
questions = generate_questions(df_string)
df_summarise = generate_df_summarise(df_string)
# 返回按钮文本和 DataFrame 字符串
return questions[0] if len(questions) > 0 else "", \
questions[1] if len(questions) > 1 else "", \
questions[2] if len(questions) > 2 else "", \
df_summarise, \
df_string
def df_to_text(df):
# 将 DataFrame 转换为纯文本
return df.to_string()
def docx_to_text(file):
# 将 Word 文档转换为纯文本
doc = Document(file)
return "\n".join([para.text for para in doc.paragraphs])
def format_seconds_to_time(seconds):
"""将秒数格式化为 时:分:秒 的形式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def extract_youtube_id(url):
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc:
# 对于标准链接,视频ID在查询参数'v'中
query_params = parse_qs(parsed_url.query)
return query_params.get("v")[0] if "v" in query_params else None
elif "youtu.be" in parsed_url.netloc:
# 对于短链接,视频ID是路径的一部分
return parsed_url.path.lstrip('/')
else:
return None
def process_youtube_link(link):
# 使用 YouTube API 获取逐字稿
# 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中
video_id = extract_youtube_id(link)
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' # youtube逐字稿圖檔的ID
# 检查/创建视频ID命名的子文件夹
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f"{video_id}_transcript.txt"
# 检查逐字稿是否存在
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
# 获取逐字稿
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['zh-TW'])
transcript_text = "\n".join([f"{item['start']}: {item['text']}" for item in transcript])
# 上传到Google Drive
upload_to_drive(service, file_name, folder_id, transcript_text)
print("逐字稿已上传到Google Drive")
else:
print("逐字稿已存在于Google Drive中")
# 再取得 transcript
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['zh-TW'])
# 基于逐字稿生成其他所需的输出
questions = generate_questions(transcript)
df_summarise = generate_df_summarise(transcript)
formatted_transcript = []
screenshot_paths = []
for entry in transcript:
start_time = format_seconds_to_time(entry['start'])
end_time = format_seconds_to_time(entry['start'] + entry['duration'])
embed_url = get_embedded_youtube_link(video_id, entry['start'])
# 截圖
screenshot_path = screenshot_youtube_video(video_id, entry['start'])
line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text'],
"embed_url": embed_url,
"screenshot_path": screenshot_path
}
formatted_transcript.append(line)
screenshot_paths.append(screenshot_path)
html_content = format_transcript_to_html(formatted_transcript)
print("=====html_content=====")
print(html_content)
print("=====html_content=====")
# 确保返回与 UI 组件预期匹配的输出
return questions[0] if len(questions) > 0 else "", \
questions[1] if len(questions) > 1 else "", \
questions[2] if len(questions) > 2 else "", \
df_summarise, \
html_content, \
screenshot_paths,
def format_transcript_to_html(formatted_transcript):
html_content = ""
for entry in formatted_transcript:
html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>"
html_content += f"<p>{entry['text']}</p>"
html_content += f"<img src='{entry['screenshot_path']}' width='500px' />"
return html_content
def get_embedded_youtube_link(video_id, start_time):
embed_url = f"https://www.youtube.com/embed/{video_id}?start={start_time}&autoplay=1"
return embed_url
def download_youtube_video(youtube_id, output_path=OUTPUT_PATH):
# Construct the full YouTube URL
youtube_url = f'https://www.youtube.com/watch?v={youtube_id}'
# Create the output directory if it doesn't exist
if not os.path.exists(output_path):
os.makedirs(output_path)
# Download the video
yt = YouTube(youtube_url)
video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
video_stream.download(output_path=output_path, filename=youtube_id+".mp4")
print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4")
def screenshot_youtube_video(youtube_id, snapshot_sec):
# 先下載 video
download_youtube_video(youtube_id, output_path=OUTPUT_PATH)
# 这里假设视频已经在适当的位置
video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4'
# Load the video and take a screenshot
with VideoFileClip(video_path) as video:
screenshot_path = f'{OUTPUT_PATH}/{youtube_id}_{snapshot_sec}.jpg'
video.save_frame(screenshot_path, snapshot_sec)
return screenshot_path
def get_screenshot_from_video(video_link, start_time):
# 实现从视频中提取帧的逻辑
# 由于这需要服务器端处理,你可能需要一种方法来下载视频,
# 并使用 ffmpeg 或类似工具提取特定时间点的帧
# 这里只是一个示意性的函数实现
screenshot_url = f"[逻辑以提取视频 {video_link}{start_time} 秒时的截图]"
return screenshot_url
def process_web_link(link):
# 抓取和解析网页内容
response = requests.get(link)
soup = BeautifulSoup(response.content, 'html.parser')
return soup.get_text()
def generate_df_summarise(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個資料分析師,服務對象為老師,請精讀資料,使用 zh-TW"
user_content = f"請根據 {df_string},大概描述這張表的欄位敘述、資料樣態與資料分析,告訴老師這張表的意義,以及可能的結論與對應方式"
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
print("=====messages=====")
print(messages)
print("=====messages=====")
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000,
}
response = client.chat.completions.create(**request_payload)
df_summarise = response.choices[0].message.content.strip()
print("=====df_summarise=====")
print(df_summarise)
print("=====df_summarise=====")
return df_summarise
def generate_questions(df_string):
# 使用 OpenAI 生成基于上传数据的问题
sys_content = "你是一個資料分析師,user為老師,請精讀資料,並用既有資料為本質猜測用戶可能會問的問題,使用 zh-TW"
user_content = f"請根據 {df_string} 生成三個問題,並用 JSON 格式返回 questions:[q1, q2, q3]"
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
response_format = { "type": "json_object" }
print("=====messages=====")
print(messages)
print("=====messages=====")
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000,
"response_format": response_format
}
response = client.chat.completions.create(**request_payload)
questions = json.loads(response.choices[0].message.content)["questions"]
print("=====json_response=====")
print(questions)
print("=====json_response=====")
return questions
def send_question(question, df_string_output, chat_history):
# 当问题按钮被点击时调用此函数
return respond(question, df_string_output, chat_history)
def respond(user_message, df_string_output, chat_history):
print("=== 變數:user_message ===")
print(user_message)
print("=== 變數:chat_history ===")
print(chat_history)
sys_content = f"你是一個資料分析師,請用 {df_string_output} 為資料進行對話,使用 zh-TW"
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_message}
]
print("=====messages=====")
print(messages)
print("=====messages=====")
request_payload = {
"model": "gpt-4-1106-preview",
"messages": messages,
"max_tokens": 4000 # 設定一個較大的值,可根據需要調整
}
response = client.chat.completions.create(**request_payload)
print(response)
response_text = response.choices[0].message.content.strip()
# 更新聊天历史
new_chat_history = (user_message, response_text)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
# 返回聊天历史和空字符串清空输入框
return "", chat_history
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
file_upload = gr.File(label="Upload your CSV or Word file")
youtube_link = gr.Textbox(label="Enter YouTube Link")
web_link = gr.Textbox(label="Enter Web Page Link")
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Message")
send_button = gr.Button("Send")
with gr.Column():
with gr.Tab("YouTube Transcript and Video"):
transcript_html = gr.HTML(label="YouTube Transcript and Video")
with gr.Tab("images"):
gallery = gr.Gallery(label="截图")
with gr.Tab("資料本文"):
df_string_output = gr.Textbox()
with gr.Tab("資料摘要"):
gr.Markdown("## 這是什麼樣的資料?")
df_summarise = gr.Textbox(container=True, show_copy_button=True, label="資料本文", lines=40)
with gr.Tab("常用問題"):
gr.Markdown("## 常用問題")
btn_1 = gr.Button()
btn_2 = gr.Button()
btn_3 = gr.Button()
send_button.click(
respond,
inputs=[msg, df_string_output, chatbot],
outputs=[msg, chatbot]
)
# 连接按钮点击事件
btn_1.click(respond, inputs=[btn_1, df_string_output, chatbot], outputs=[msg, chatbot])
btn_2.click(respond, inputs=[btn_2, df_string_output, chatbot], outputs=[msg, chatbot])
btn_3.click(respond, inputs=[btn_3, df_string_output, chatbot], outputs=[msg, chatbot])
# file_upload.change(process_file, inputs=file_upload, outputs=df_string_output)
file_upload.change(process_file, inputs=file_upload, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output])
# 当输入 YouTube 链接时触发
youtube_link.change(process_youtube_link, inputs=youtube_link, outputs=[btn_1, btn_2, btn_3, df_summarise, transcript_html, gallery])
# 当输入网页链接时触发
web_link.change(process_web_link, inputs=web_link, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output])
demo.launch(allowed_paths=["videos"])