Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from docx import Document | |
| import os | |
| from openai import OpenAI | |
| import json | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api._errors import NoTranscriptFound | |
| from moviepy.editor import VideoFileClip | |
| from pytube import YouTube | |
| import os | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from googleapiclient.http import MediaIoBaseDownload | |
| from googleapiclient.http import MediaIoBaseUpload | |
| import io | |
| from urllib.parse import urlparse, parse_qs | |
| # 假设您的环境变量或Secret的名称是GOOGLE_APPLICATION_CREDENTIALS_JSON | |
| # credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| # credentials_dict = json.loads(credentials_json_string) | |
| # SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| # credentials = service_account.Credentials.from_service_account_info( | |
| # credentials_dict, scopes=SCOPES) | |
| # service = build('drive', 'v3', credentials=credentials) | |
| # # 列出 Google Drive 上的前10個文件 | |
| # results = service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute() | |
| # items = results.get('files', []) | |
| # if not items: | |
| # print('No files found.') | |
| # else: | |
| # print("=====Google Drive 上的前10個文件=====") | |
| # print('Files:') | |
| # for item in items: | |
| # print(u'{0} ({1})'.format(item['name'], item['id'])) | |
| OUTPUT_PATH = 'videos' | |
| TRANSCRIPTS = [] | |
| CURRENT_INDEX = 0 | |
| OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") | |
| client = OpenAI(api_key=OPEN_AI_KEY) | |
| # # ====drive====初始化Google Drive服务 | |
| def init_drive_service(): | |
| credentials_json_string = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| credentials_dict = json.loads(credentials_json_string) | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| credentials = service_account.Credentials.from_service_account_info( | |
| credentials_dict, scopes=SCOPES) | |
| service = build('drive', 'v3', credentials=credentials) | |
| return service | |
| def create_folder_if_not_exists(service, folder_name, parent_id): | |
| print("检查是否存在特定名称的文件夹,如果不存在则创建") | |
| query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false" | |
| response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute() | |
| folders = response.get('files', []) | |
| if not folders: | |
| # 文件夹不存在,创建新文件夹 | |
| file_metadata = { | |
| 'name': folder_name, | |
| 'mimeType': 'application/vnd.google-apps.folder', | |
| 'parents': [parent_id] | |
| } | |
| folder = service.files().create(body=file_metadata, fields='id').execute() | |
| return folder.get('id') | |
| else: | |
| # 文件夹已存在 | |
| return folders[0]['id'] | |
| # 检查Google Drive上是否存在文件 | |
| def check_file_exists(service, folder_name, file_name): | |
| query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false" | |
| response = service.files().list(q=query).execute() | |
| files = response.get('files', []) | |
| return len(files) > 0, files[0]['id'] if files else None | |
| def upload_to_drive(service, file_name, folder_id, content): | |
| print("上传文本内容到Google Drive指定的文件夹中") | |
| # 如果您的内容是字符串(文本),请使用io.StringIO | |
| # 对于二进制内容,请使用io.BytesIO | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| # 这里我们假定content是文本,因此使用io.StringIO | |
| media = MediaFileUpload(io.StringIO(content), mimetype='text/plain') | |
| service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| def upload_content_directly(service, file_name, folder_id, content): | |
| """ | |
| 直接将内容上传到Google Drive中的新文件。 | |
| """ | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| # 使用io.StringIO为文本内容创建一个内存中的文件对象 | |
| fh = io.BytesIO(content.encode('utf-8')) | |
| media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True) | |
| print("==content==") | |
| print(content) | |
| print("==content==") | |
| print("==media==") | |
| print(media) | |
| print("==media==") | |
| # 执行上传 | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| return file.get('id') | |
| def download_file_as_string(service, file_id): | |
| """ | |
| 从Google Drive下载文件并将其作为字符串返回。 | |
| """ | |
| request = service.files().get_media(fileId=file_id) | |
| fh = io.BytesIO() | |
| downloader = MediaIoBaseDownload(fh, request) | |
| done = False | |
| while done is False: | |
| status, done = downloader.next_chunk() | |
| fh.seek(0) | |
| content = fh.read().decode('utf-8') | |
| return content | |
| def upload_img_directly(service, file_name, folder_id, file_path): | |
| file_metadata = {'name': file_name, 'parents': [folder_id]} | |
| media = MediaFileUpload(file_path, mimetype='image/jpeg') | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| return file.get('id') # 返回文件ID | |
| def set_public_permission(service, file_id): | |
| service.permissions().create( | |
| fileId=file_id, | |
| body={"type": "anyone", "role": "reader"}, | |
| fields='id', | |
| ).execute() | |
| def update_file_on_drive(service, file_id, file_content): | |
| """ | |
| 更新Google Drive上的文件内容。 | |
| 参数: | |
| - service: Google Drive API服务实例。 | |
| - file_id: 要更新的文件的ID。 | |
| - file_content: 新的文件内容,字符串格式。 | |
| """ | |
| # 将新的文件内容转换为字节流 | |
| fh = io.BytesIO(file_content.encode('utf-8')) | |
| media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True) | |
| # 更新文件 | |
| updated_file = service.files().update( | |
| fileId=file_id, | |
| media_body=media | |
| ).execute() | |
| print(f"文件已更新,文件ID: {updated_file['id']}") | |
| # ====drive==== | |
| def process_file(file): | |
| # 读取文件 | |
| if file.name.endswith('.csv'): | |
| df = pd.read_csv(file) | |
| text = df_to_text(df) | |
| elif file.name.endswith('.xlsx'): | |
| df = pd.read_excel(file) | |
| text = df_to_text(df) | |
| elif file.name.endswith('.docx'): | |
| text = docx_to_text(file) | |
| else: | |
| raise ValueError("Unsupported file type") | |
| df_string = df.to_string() | |
| # 宜蘭:移除@XX@符号 to | | |
| df_string = df_string.replace("@XX@", "|") | |
| # 根据上传的文件内容生成问题 | |
| questions = generate_questions(df_string) | |
| summary = generate_summarise(df_string) | |
| # 返回按钮文本和 DataFrame 字符串 | |
| return questions[0] if len(questions) > 0 else "", \ | |
| questions[1] if len(questions) > 1 else "", \ | |
| questions[2] if len(questions) > 2 else "", \ | |
| summary, \ | |
| df_string | |
| def df_to_text(df): | |
| # 将 DataFrame 转换为纯文本 | |
| return df.to_string() | |
| def docx_to_text(file): | |
| # 将 Word 文档转换为纯文本 | |
| doc = Document(file) | |
| return "\n".join([para.text for para in doc.paragraphs]) | |
| def format_seconds_to_time(seconds): | |
| """将秒数格式化为 时:分:秒 的形式""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| seconds = int(seconds % 60) | |
| return f"{hours:02}:{minutes:02}:{seconds:02}" | |
| def extract_youtube_id(url): | |
| parsed_url = urlparse(url) | |
| if "youtube.com" in parsed_url.netloc: | |
| # 对于标准链接,视频ID在查询参数'v'中 | |
| query_params = parse_qs(parsed_url.query) | |
| return query_params.get("v")[0] if "v" in query_params else None | |
| elif "youtu.be" in parsed_url.netloc: | |
| # 对于短链接,视频ID是路径的一部分 | |
| return parsed_url.path.lstrip('/') | |
| else: | |
| return None | |
| def get_transcript(video_id): | |
| languages = ['zh-TW', 'zh-Hant', 'en'] # 優先順序列表 | |
| for language in languages: | |
| try: | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) | |
| return transcript # 成功獲取字幕,直接返回結果 | |
| except NoTranscriptFound: | |
| continue # 當前語言的字幕沒有找到,繼續嘗試下一個語言 | |
| return None # 所有嘗試都失敗,返回None | |
| def process_transcript_and_screenshots(video_id): | |
| print("====process_transcript_and_screenshots====") | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_transcript.json' | |
| # 检查逐字稿是否存在 | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| # 从YouTube获取逐字稿并上传 | |
| transcript = get_transcript(video_id) | |
| if transcript: | |
| print("成功獲取字幕") | |
| else: | |
| print("沒有找到字幕") | |
| transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) | |
| file_id = upload_content_directly(service, file_name, folder_id, transcript_text) | |
| print("逐字稿已上传到Google Drive") | |
| else: | |
| # 逐字稿已存在,下载逐字稿内容 | |
| print("逐字稿已存在于Google Drive中") | |
| transcript_text = download_file_as_string(service, file_id) | |
| transcript = json.loads(transcript_text) | |
| # 处理逐字稿中的每个条目,检查并上传截图 | |
| for entry in transcript: | |
| if 'img_file_id' not in entry: | |
| screenshot_path = screenshot_youtube_video(video_id, entry['start']) | |
| img_file_id = upload_img_directly(service, f"{video_id}_{entry['start']}.jpg", folder_id, screenshot_path) | |
| set_public_permission(service, img_file_id) | |
| entry['img_file_id'] = img_file_id | |
| print(f"截图已上传到Google Drive: {img_file_id}") | |
| # 更新逐字稿文件 | |
| updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2) | |
| update_file_on_drive(service, file_id, updated_transcript_text) | |
| print("逐字稿已更新,包括截图链接") | |
| return transcript | |
| def process_youtube_link(link): | |
| # 使用 YouTube API 获取逐字稿 | |
| # 假设您已经获取了 YouTube 视频的逐字稿并存储在变量 `transcript` 中 | |
| video_id = extract_youtube_id(link) | |
| download_youtube_video(video_id, output_path=OUTPUT_PATH) | |
| try: | |
| transcript = process_transcript_and_screenshots(video_id) | |
| except Exception as e: | |
| error_msg = f" {video_id} 逐字稿錯誤: {str(e)}" | |
| print("===process_youtube_link error===") | |
| print(error_msg) | |
| raise gr.Error(error_msg) | |
| formatted_transcript = [] | |
| formatted_simple_transcript =[] | |
| screenshot_paths = [] | |
| for entry in transcript: | |
| start_time = format_seconds_to_time(entry['start']) | |
| end_time = format_seconds_to_time(entry['start'] + entry['duration']) | |
| embed_url = get_embedded_youtube_link(video_id, entry['start']) | |
| img_file_id = entry['img_file_id'] | |
| screenshot_path = f"https://lh3.googleusercontent.com/d/{img_file_id}=s4000" | |
| line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'], | |
| "embed_url": embed_url, | |
| "screenshot_path": screenshot_path | |
| } | |
| formatted_transcript.append(line) | |
| # formatted_simple_transcript 只要 start_time, end_time, text | |
| simple_line = { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "text": entry['text'] | |
| } | |
| formatted_simple_transcript.append(simple_line) | |
| screenshot_paths.append(screenshot_path) | |
| global TRANSCRIPTS | |
| TRANSCRIPTS = formatted_transcript | |
| # 基于逐字稿生成其他所需的输出 | |
| # questions = generate_questions(formatted_simple_transcript) | |
| questions = ["", "", ""] | |
| formatted_transcript_json = json.dumps(formatted_transcript, ensure_ascii=False, indent=2) | |
| summary_json = get_video_id_summary(video_id, formatted_simple_transcript) | |
| summary = summary_json["summary"] | |
| html_content = format_transcript_to_html(formatted_transcript) | |
| first_image = formatted_transcript[0]['screenshot_path'] | |
| first_text = formatted_transcript[0]['text'] | |
| # 确保返回与 UI 组件预期匹配的输出 | |
| return questions[0] if len(questions) > 0 else "", \ | |
| questions[1] if len(questions) > 1 else "", \ | |
| questions[2] if len(questions) > 2 else "", \ | |
| formatted_transcript_json, \ | |
| summary, \ | |
| html_content, \ | |
| first_image, \ | |
| first_text | |
| def format_transcript_to_html(formatted_transcript): | |
| html_content = "" | |
| for entry in formatted_transcript: | |
| html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>" | |
| html_content += f"<p>{entry['text']}</p>" | |
| html_content += f"<img src='{entry['screenshot_path']}' width='500px' />" | |
| return html_content | |
| def get_embedded_youtube_link(video_id, start_time): | |
| embed_url = f"https://www.youtube.com/embed/{video_id}?start={start_time}&autoplay=1" | |
| return embed_url | |
| def download_youtube_video(youtube_id, output_path=OUTPUT_PATH): | |
| # Construct the full YouTube URL | |
| youtube_url = f'https://www.youtube.com/watch?v={youtube_id}' | |
| # Create the output directory if it doesn't exist | |
| if not os.path.exists(output_path): | |
| os.makedirs(output_path) | |
| # Download the video | |
| yt = YouTube(youtube_url) | |
| video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| video_stream.download(output_path=output_path, filename=youtube_id+".mp4") | |
| print(f"Video downloaded successfully: {output_path}/{youtube_id}.mp4") | |
| def screenshot_youtube_video(youtube_id, snapshot_sec): | |
| video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4' | |
| file_name = f"{youtube_id}_{snapshot_sec}.jpg" | |
| with VideoFileClip(video_path) as video: | |
| screenshot_path = f'{OUTPUT_PATH}/{file_name}' | |
| video.save_frame(screenshot_path, snapshot_sec) | |
| return screenshot_path | |
| def process_web_link(link): | |
| # 抓取和解析网页内容 | |
| response = requests.get(link) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| return soup.get_text() | |
| def preprocess_text(text): | |
| # 预处理文本以便在 JSON 中使用 | |
| json_text = text.replace("\n", "\\n") | |
| json_text = json_text.replace('"', '\\"') | |
| json_text = json_text.replace("'", "\\'") | |
| json_text = json_text.replace("\t", "\\t") | |
| json_text = json_text.replace("\r", "\\r") | |
| json_text = json_text.replace("\f", "\\f") | |
| json_text = json_text.replace("\b", "\\b") | |
| json_text = json_text.replace("\v", "\\v") | |
| json_text = json_text.replace(":", "\\:") | |
| return json_text | |
| # get video_id_summary.json content | |
| def get_video_id_summary(video_id, df_string): | |
| try: | |
| service = init_drive_service() | |
| parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL' | |
| folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id) | |
| file_name = f'{video_id}_summary.json' | |
| # 检查逐字稿是否存在 | |
| exists, file_id = check_file_exists(service, folder_id, file_name) | |
| if not exists: | |
| summary = generate_summarise(df_string) | |
| processed_summary = preprocess_text(summary) | |
| summary_json = {"summary": processed_summary} | |
| summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2) | |
| file_id = upload_content_directly(service, file_name, folder_id, summary_text) | |
| print("summary已上传到Google Drive") | |
| else: | |
| # 逐字稿已存在,下载逐字稿内容 | |
| print("summary已存在于Google Drive中") | |
| summary_text = download_file_as_string(service, file_id) | |
| summary_json = json.loads(summary_text) | |
| return summary_json | |
| except Exception as e: | |
| error_msg = f" {video_id} 摘要錯誤: {str(e)}" | |
| print("===get_video_id_summary error===") | |
| print(error_msg) | |
| raise gr.Error(error_msg) | |
| def generate_summarise(df_string): | |
| # 使用 OpenAI 生成基于上传数据的问题 | |
| sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,使用 zh-TW" | |
| user_content = f""" | |
| 請根據 {df_string},判斷這份文本 | |
| 如果是資料類型,請提估欄位敘述、資料樣態與資料分析,告訴學生這張表的意義,以及可能的結論與對應方式 | |
| 如果是影片類型,請提估影片內容,告訴學生這部影片的意義, | |
| 小範圍切出不同段落的相對應時間軸的重點摘要,最多不超過五段 | |
| 注意不要遺漏任何一段時間軸的內容 | |
| 格式為 【start - end】: 摘要 | |
| 以及可能的結論與結尾延伸小問題提供學生作反思 | |
| 整體格式為: | |
| 1. 內容類型:? | |
| 2. 整體摘要 | |
| 3. 條列式重點 | |
| 4. 關鍵時刻(段落摘要) | |
| 5. 結論反思(為什麼我們要學這個?) | |
| 6. 延伸小問題 | |
| """ | |
| # 🗂️ 1. 內容類型:? | |
| # 📚 2. 整體摘要 | |
| # 🔖 3. 條列式重點 | |
| # 🔑 4. 關鍵時刻(段落摘要) | |
| # 💡 5. 結論反思(為什麼我們要學這個?) | |
| # ❓ 6. 延伸小問題 | |
| messages = [ | |
| {"role": "system", "content": sys_content}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| request_payload = { | |
| "model": "gpt-4-1106-preview", | |
| "messages": messages, | |
| "max_tokens": 4000, | |
| } | |
| response = client.chat.completions.create(**request_payload) | |
| df_summarise = response.choices[0].message.content.strip() | |
| print("=====df_summarise=====") | |
| print(df_summarise) | |
| print("=====df_summarise=====") | |
| return df_summarise | |
| def generate_questions(df_string): | |
| # 使用 OpenAI 生成基于上传数据的问题 | |
| sys_content = "你是一個擅長資料分析跟影片教學的老師,user 為學生,請精讀資料文本,自行判斷資料的種類,並用既有資料為本質猜測用戶可能會問的問題,使用 zh-TW" | |
| user_content = f"請根據 {df_string} 生成三個問題,並用 JSON 格式返回 questions:[q1的敘述text, q2的敘述text, q3的敘述text]" | |
| messages = [ | |
| {"role": "system", "content": sys_content}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| response_format = { "type": "json_object" } | |
| print("=====messages=====") | |
| print(messages) | |
| print("=====messages=====") | |
| request_payload = { | |
| "model": "gpt-4-1106-preview", | |
| "messages": messages, | |
| "max_tokens": 4000, | |
| "response_format": response_format | |
| } | |
| response = client.chat.completions.create(**request_payload) | |
| questions = json.loads(response.choices[0].message.content)["questions"] | |
| print("=====json_response=====") | |
| print(questions) | |
| print("=====json_response=====") | |
| return questions | |
| def get_questions(df_string): | |
| questions = generate_questions(df_string) | |
| q1 = questions[0] if len(questions) > 0 else "" | |
| q2 = questions[1] if len(questions) > 1 else "" | |
| q3 = questions[2] if len(questions) > 2 else "" | |
| print("=====get_questions=====") | |
| print(f"q1: {q1}") | |
| print(f"q2: {q2}") | |
| print(f"q3: {q3}") | |
| print("=====get_questions=====") | |
| return q1, q2, q3 | |
| def send_question(question, df_string_output, chat_history): | |
| # 当问题按钮被点击时调用此函数 | |
| return respond(question, df_string_output, chat_history) | |
| def respond(user_message, df_string_output, chat_history): | |
| print("=== 變數:user_message ===") | |
| print(user_message) | |
| print("=== 變數:chat_history ===") | |
| print(chat_history) | |
| sys_content = f""" | |
| 你是一個擅長資料分析跟影片教學的老師,user 為學生 | |
| 請用 {df_string_output} 為資料文本,自行判斷資料的種類, | |
| 並進行對話,使用 zh-TW | |
| 如果是影片類型,不用解釋逐字稿格式,直接回答學生問題 | |
| 請你用蘇格拉底式的提問方式,引導學生思考,並且給予學生一些提示 | |
| 不要直接給予答案,讓學生自己思考 | |
| 但可以給予一些提示跟引導,例如給予影片的時間軸,讓學生自己去找答案 | |
| 如果學生問了一些問題你無法判斷,請告訴學生你無法判斷,並建議學生可以問其他問題 | |
| 或者你可以問學生一些問題,幫助學生更好的理解資料 | |
| 如果學生的問題與資料文本無關,請告訴學生你無法回答超出範圍的問題 | |
| """ | |
| messages = [ | |
| {"role": "system", "content": sys_content} | |
| ] | |
| # if chat_history is not none, append role, content to messages | |
| # chat_history = [(user, assistant), (user, assistant), ...] | |
| # In the list, first one is user, then assistant | |
| if chat_history is not None: | |
| # 如果超過10則訊息,只保留最後10則訊息 | |
| if len(chat_history) > 10: | |
| chat_history = chat_history[-10:] | |
| for chat in chat_history: | |
| old_messages = [ | |
| {"role": "user", "content": chat[0]}, | |
| {"role": "assistant", "content": chat[1]} | |
| ] | |
| messages += old_messages | |
| else: | |
| pass | |
| messages.append({"role": "user", "content": user_message}) | |
| print("=====messages=====") | |
| print(messages) | |
| print("=====messages=====") | |
| request_payload = { | |
| "model": "gpt-4-1106-preview", | |
| "messages": messages, | |
| "max_tokens": 4000 # 設定一個較大的值,可根據需要調整 | |
| } | |
| response = client.chat.completions.create(**request_payload) | |
| print(response) | |
| response_text = response.choices[0].message.content.strip() | |
| # 更新聊天历史 | |
| new_chat_history = (user_message, response_text) | |
| if chat_history is None: | |
| chat_history = [new_chat_history] | |
| else: | |
| chat_history.append(new_chat_history) | |
| # 返回聊天历史和空字符串清空输入框 | |
| return "", chat_history | |
| def update_slide(direction): | |
| global TRANSCRIPTS | |
| global CURRENT_INDEX | |
| print("=== 更新投影片 ===") | |
| print(f"CURRENT_INDEX: {CURRENT_INDEX}") | |
| print(f"TRANSCRIPTS: {TRANSCRIPTS}") | |
| CURRENT_INDEX += direction | |
| if CURRENT_INDEX < 0: | |
| CURRENT_INDEX = 0 # 防止索引小于0 | |
| elif CURRENT_INDEX >= len(TRANSCRIPTS): | |
| CURRENT_INDEX = len(TRANSCRIPTS) - 1 # 防止索引超出范围 | |
| # 获取当前条目的文本和截图 URL | |
| current_transcript = TRANSCRIPTS[CURRENT_INDEX] | |
| slide_image = current_transcript["screenshot_path"] | |
| slide_text = current_transcript["text"] | |
| return slide_image, slide_text | |
| def prev_slide(): | |
| return update_slide(-1) | |
| # 包装函数来处理 "下一个" 按钮点击事件 | |
| def next_slide(): | |
| return update_slide(1) | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_upload = gr.File(label="Upload your CSV or Word file", visible=False) | |
| youtube_link = gr.Textbox(label="Enter YouTube Link") | |
| web_link = gr.Textbox(label="Enter Web Page Link", visible=False) | |
| chatbot = gr.Chatbot() | |
| msg = gr.Textbox(label="Message") | |
| send_button = gr.Button("Send") | |
| with gr.Column(): | |
| with gr.Tab("截圖與逐字稿"): | |
| transcript_html = gr.HTML(label="YouTube Transcript and Video") | |
| with gr.Tab("投影片"): | |
| slide_image = gr.Image() | |
| slide_text = gr.Textbox() | |
| with gr.Row(): | |
| prev_button = gr.Button("Previous") | |
| next_button = gr.Button("Next") | |
| prev_button.click(fn=prev_slide, inputs=[], outputs=[slide_image, slide_text]) | |
| next_button.click(fn=next_slide, inputs=[], outputs=[slide_image, slide_text]) | |
| with gr.Tab("資料本文"): | |
| df_string_output = gr.Textbox(lines=40, label="Data Text") | |
| with gr.Tab("重點整理"): | |
| df_summarise = gr.Textbox(container=True, show_copy_button=True, lines=40) | |
| with gr.Tab("常用問題"): | |
| gr.Markdown("## 常用問題") | |
| btn_1 = gr.Button() | |
| btn_2 = gr.Button() | |
| btn_3 = gr.Button() | |
| gr.Markdown("## 重新生成問題") | |
| btn_create_question = gr.Button("Create Questions") | |
| send_button.click( | |
| respond, | |
| inputs=[msg, df_string_output, chatbot], | |
| outputs=[msg, chatbot] | |
| ) | |
| # 连接按钮点击事件 | |
| btn_1.click(respond, inputs=[btn_1, df_string_output, chatbot], outputs=[msg, chatbot]) | |
| btn_2.click(respond, inputs=[btn_2, df_string_output, chatbot], outputs=[msg, chatbot]) | |
| btn_3.click(respond, inputs=[btn_3, df_string_output, chatbot], outputs=[msg, chatbot]) | |
| btn_create_question.click(get_questions, inputs = [df_string_output], outputs = [btn_1, btn_2, btn_3]) | |
| # file_upload.change(process_file, inputs=file_upload, outputs=df_string_output) | |
| file_upload.change(process_file, inputs=file_upload, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) | |
| # 当输入 YouTube 链接时触发 | |
| youtube_link.change(process_youtube_link, inputs=youtube_link, outputs=[btn_1, btn_2, btn_3, df_string_output, df_summarise, transcript_html, slide_image, slide_text]) | |
| # 当输入网页链接时触发 | |
| web_link.change(process_web_link, inputs=web_link, outputs=[btn_1, btn_2, btn_3, df_summarise, df_string_output]) | |
| if TRANSCRIPTS: # 确保列表不为空 | |
| first_screenshot_path, first_text = update_slide(0) | |
| image.update(value=first_screenshot_path) | |
| text.update(value=first_text) | |
| demo.launch(allowed_paths=["videos"]) | |