import gradio as gr import pandas as pd import requests from bs4 import BeautifulSoup from docx import Document import os from openai import OpenAI import json from youtube_transcript_api import YouTubeTranscriptApi from moviepy.editor import VideoFileClip from pytube import YouTube import os from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.http import MediaIoBaseUpload import io from urllib.parse import urlparse, parse_qs # 假设您的环境变量或Secret的名称是GOOGLE_APPLICATION_CREDENTIALS_JSON # credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") # credentials_dict = json.loads(credentials_json_string) # SCOPES = ['https://www.googleapis.com/auth/drive'] # credentials = service_account.Credentials.from_service_account_info( # credentials_dict, scopes=SCOPES) # service = build('drive', 'v3', credentials=credentials) # # 列出 Google Drive 上的前10個文件 # results = service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute() # items = results.get('files', []) # if not items: # print('No files found.') # else: # print("=====Google Drive 上的前10個文件=====") # print('Files:') # for item in items: # print(u'{0} ({1})'.format(item['name'], item['id'])) OUTPUT_PATH = 'videos' OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") client = OpenAI(api_key=OPEN_AI_KEY) # 初始化Google Drive服务 def init_drive_service(): credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") credentials_dict = json.loads(credentials_json_string) SCOPES = ['https://www.googleapis.com/auth/drive'] credentials = service_account.Credentials.from_service_account_info( credentials_dict, scopes=SCOPES) service = build('drive', 'v3', credentials=credentials) return service def create_folder_if_not_exists(service, folder_name, parent_id): print("检查是否存在特定名称的文件夹,如果不存在则创建") query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false" response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() folders = response.get('files', []) if not folders: # 文件夹不存在,创建新文件夹 file_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [parent_id] } folder = service.files().create(body=file_metadata, fields='id').execute() return folder.get('id') else: # 文件夹已存在 return folders[0]['id'] # 检查Google Drive上是否存在文件 def check_file_exists(service, folder_name, file_name): query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false" response = service.files().list(q=query).execute() files = response.get('files', []) return len(files) > 0, files[0]['id'] if files else None def upload_to_drive(service, file_name, folder_id, content): print("上传文本内容到Google Drive指定的文件夹中") # 如果您的内容是字符串(文本),请使用io.StringIO # 对于二进制内容,请使用io.BytesIO file_metadata = {'name': file_name, 'parents': [folder_id]} # 这里我们假定content是文本,因此使用io.StringIO media = MediaFileUpload(io.StringIO(content), mimetype='text/plain') service.files().create(body=file_metadata, media_body=media, fields='id').execute() def process_file(file): # 读取文件 if file.name.endswith('.csv'): df = pd.read_csv(file) text = df_to_text(df) elif file.name.endswith('.xlsx'): df = pd.read_excel(file) text = df_to_text(df) elif file.name.endswith('.docx'): text = docx_to_text(file) else: raise ValueError("Unsupported file type") df_string = df.to_string() # 宜蘭:移除@XX@符号 to | df_string = df_string.replace("@XX@", "|") # 根据上传的文件内容生成问题 questions = generate_questions(df_string) df_summarise = generate_df_summarise(df_string) # 返回按钮文本和 DataFrame 字符串 return questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ df_summarise, \ df_string def df_to_text(df): # 将 DataFrame 转换为纯文本 return df.to_string() def docx_to_text(file): # 将 Word 文档转换为纯文本 doc = Document(file) return "\n".join([para.text for para in doc.paragraphs]) def format_seconds_to_time(seconds): """将秒数格式化为 时:分:秒 的形式""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02}:{minutes:02}:{seconds:02}" def extract_youtube_id(url): parsed_url = urlparse(url) if "youtube.com" in parsed_url.netloc: # 对于标准链接,视频ID在查询参数'v'中 query_params = parse_qs(parsed_url.query) return query_params.get("v")[0] if "v" in query_params else None elif "youtu.be" in parsed_url.netloc: # 对于短链接,视频ID是路径的一部分 return parsed_url.path.lstrip('/') else: return None def process_youtube_link(link): # 使用 YouTube API 获取逐字稿 # 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中 video_id = extract_youtube_id(link) service = init_drive_service() parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' # youtube逐字稿圖檔的ID # 检查/创建视频ID命名的子文件夹 folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) file_name = f"{video_id}_transcript.txt" # 检查逐字稿是否存在 exists, file_id = check_file_exists(service, folder_id, file_name) if not exists: # 获取逐字稿 transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['zh-TW']) transcript_text = "\n".join([f"{item['start']}: {item['text']}" for item in transcript]) # 上传到Google Drive upload_to_drive(service, file_name, folder_id, transcript_text) print("逐字稿已上传到Google Drive") else: print("逐字稿已存在于Google Drive中") # 再取得 transcript transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['zh-TW']) # 基于逐字稿生成其他所需的输出 questions = generate_questions(transcript) df_summarise = generate_df_summarise(transcript) formatted_transcript = [] screenshot_paths = [] for entry in transcript: start_time = format_seconds_to_time(entry['start']) end_time = format_seconds_to_time(entry['start'] + entry['duration']) embed_url = get_embedded_youtube_link(video_id, entry['start']) # 截圖 screenshot_path = screenshot_youtube_video(video_id, entry['start']) line = { "start_time": start_time, "end_time": end_time, "text": entry['text'], "embed_url": embed_url, "screenshot_path": screenshot_path } formatted_transcript.append(line) screenshot_paths.append(screenshot_path) html_content = format_transcript_to_html(formatted_transcript) print("=====html_content=====") print(html_content) print("=====html_content=====") # 确保返回与 UI 组件预期匹配的输出 return questions[0] if len(questions) > 0 else "", \ questions[1] if len(questions) > 1 else "", \ questions[2] if len(questions) > 2 else "", \ df_summarise, \ html_content, \ screenshot_paths, def format_transcript_to_html(formatted_transcript): html_content = "" for entry in formatted_transcript: html_content += f"

{entry['start_time']} - {entry['end_time']}

" html_content += f"

{entry['text']}

" html_content += f"" return html_content def get_embedded_youtube_link(video_id, start_time): embed_url = f"https://www.youtube.com/embed/{video_id}?start={start_time}&autoplay=1" return embed_url def download_youtube_video(youtube_id, output_path=OUTPUT_PATH): # Construct the full YouTube URL youtube_url = f'https://www.youtube.com/watch?v={youtube_id}' # Create the output directory if it doesn't exist if not os.path.exists(output_path): os.makedirs(output_path) # Download the video yt = YouTube(youtube_url) video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() video_stream.download(output_path=output_path, filename=youtube_id+".mp4") print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4") def screenshot_youtube_video(youtube_id, snapshot_sec): # 先下載 video download_youtube_video(youtube_id, output_path=OUTPUT_PATH) # 这里假设视频已经在适当的位置 video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4' # Load the video and take a screenshot with VideoFileClip(video_path) as video: screenshot_path = f'{OUTPUT_PATH}/{youtube_id}_{snapshot_sec}.jpg' video.save_frame(screenshot_path, snapshot_sec) return screenshot_path def get_screenshot_from_video(video_link, start_time): # 实现从视频中提取帧的逻辑 # 由于这需要服务器端处理,你可能需要一种方法来下载视频, # 并使用 ffmpeg 或类似工具提取特定时间点的帧 # 这里只是一个示意性的函数实现 screenshot_url = f"[逻辑以提取视频 {video_link} 在 {start_time} 秒时的截图]" return screenshot_url def process_web_link(link): # 抓取和解析网页内容 response = requests.get(link) soup = BeautifulSoup(response.content, 'html.parser') return soup.get_text() def generate_df_summarise(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個資料分析師,服務對象為老師,請精讀資料,使用 zh-TW" user_content = f"請根據 {df_string},大概描述這張表的欄位敘述、資料樣態與資料分析,告訴老師這張表的意義,以及可能的結論與對應方式" messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] print("=====messages=====") print(messages) print("=====messages=====") request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, } response = client.chat.completions.create(**request_payload) df_summarise = response.choices[0].message.content.strip() print("=====df_summarise=====") print(df_summarise) print("=====df_summarise=====") return df_summarise def generate_questions(df_string): # 使用 OpenAI 生成基于上传数据的问题 sys_content = "你是一個資料分析師,user為老師,請精讀資料,並用既有資料為本質猜測用戶可能會問的問題,使用 zh-TW" user_content = f"請根據 {df_string} 生成三個問題,並用 JSON 格式返回 questions:[q1, q2, q3]" messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_content} ] response_format = { "type": "json_object" } print("=====messages=====") print(messages) print("=====messages=====") request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000, "response_format": response_format } response = client.chat.completions.create(**request_payload) questions = json.loads(response.choices[0].message.content)["questions"] print("=====json_response=====") print(questions) print("=====json_response=====") return questions def send_question(question, df_string_output, chat_history): # 当问题按钮被点击时调用此函数 return respond(question, df_string_output, chat_history) def respond(user_message, df_string_output, chat_history): print("=== 變數:user_message ===") print(user_message) print("=== 變數:chat_history ===") print(chat_history) sys_content = f"你是一個資料分析師,請用 {df_string_output} 為資料進行對話,使用 zh-TW" messages = [ {"role": "system", "content": sys_content}, {"role": "user", "content": user_message} ] print("=====messages=====") print(messages) print("=====messages=====") request_payload = { "model": "gpt-4-1106-preview", "messages": messages, "max_tokens": 4000 # 設定一個較大的值,可根據需要調整 } response = client.chat.completions.create(**request_payload) print(response) response_text = response.choices[0].message.content.strip() # 更新聊天历史 new_chat_history = (user_message, response_text) if chat_history is None: chat_history = [new_chat_history] else: chat_history.append(new_chat_history) # 返回聊天历史和空字符串清空输入框 return "", chat_history with gr.Blocks() as demo: with gr.Row(): with gr.Column(): file_upload = gr.File(label="Upload your CSV or Word file") youtube_link = gr.Textbox(label="Enter YouTube Link") web_link = gr.Textbox(label="Enter Web Page Link") chatbot = gr.Chatbot() msg = gr.Textbox(label="Message") send_button = gr.Button("Send") with gr.Column(): with gr.Tab("YouTube Transcript and Video"): transcript_html = gr.HTML(label="YouTube Transcript and Video") with gr.Tab("images"): gallery = gr.Gallery(label="截图") with gr.Tab("資料本文"): df_string_output = gr.Textbox() with gr.Tab("資料摘要"): gr.Markdown("## 這是什麼樣的資料?") df_summarise = gr.Textbox(container=True, show_copy_button=True, label="資料本文", lines=40) with gr.Tab("常用問題"): gr.Markdown("## 常用問題") btn_1 = gr.Button() btn_2 = gr.Button() btn_3 = gr.Button() send_button.click( respond, inputs=[msg, df_string_output, chatbot], outputs=[msg, chatbot] ) # 连接按钮点击事件 btn_1.click(respond, inputs=[btn_1, df_string_output, chatbot], outputs=[msg, chatbot]) btn_2.click(respond, inputs=[btn_2, df_string_output, chatbot], outputs=[msg, chatbot]) btn_3.click(respond, inputs=[btn_3, df_string_output, chatbot], outputs=[msg, chatbot]) # file_upload.change(process_file, inputs=file_upload, outputs=df_string_output) file_upload.change(process_file, inputs=file_upload, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) # 当输入 YouTube 链接时触发 youtube_link.change(process_youtube_link, inputs=youtube_link, outputs=[btn_1, btn_2, btn_3, df_summarise, transcript_html, gallery]) # 当输入网页链接时触发 web_link.change(process_web_link, inputs=web_link, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) demo.launch(allowed_paths=["videos"])