video_bot_999 / app.py
youngtsai's picture
update
aaa2911
raw
history blame
163 kB
import gradio as gr
import pandas as pd
import requests
from bs4 import BeautifulSoup
from docx import Document
import os
from openai import OpenAI
from groq import Groq
import uuid
from gtts import gTTS
import math
from pydub import AudioSegment
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import NoTranscriptFound
import yt_dlp
from moviepy.editor import VideoFileClip
from pytube import YouTube
import os
import io
import time
import json
from datetime import timedelta
from urllib.parse import urlparse, parse_qs
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.http import MediaIoBaseUpload
from educational_material import EducationalMaterial
from storage_service import GoogleCloudStorage
import boto3
from chatbot import Chatbot
is_env_local = os.getenv("IS_ENV_LOCAL", "false") == "true"
print(f"is_env_local: {is_env_local}")
print("===gr__version__===")
print(gr.__version__)
# KEY CONFIG
if is_env_local:
with open("local_config.json") as f:
config = json.load(f)
PASSWORD = config["PASSWORD"]
GCS_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"])
DRIVE_KEY = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS_JSON"])
OPEN_AI_KEY = config["OPEN_AI_KEY"]
OPEN_AI_ASSISTANT_ID_GPT4_BOT1 = config["OPEN_AI_ASSISTANT_ID_GPT4_BOT1"]
OPEN_AI_ASSISTANT_ID_GPT3_BOT1 = config["OPEN_AI_ASSISTANT_ID_GPT3_BOT1"]
OPEN_AI_KEY_BOT2 = config["OPEN_AI_KEY_BOT2"]
OPEN_AI_ASSISTANT_ID_GPT4_BOT2 = config["OPEN_AI_ASSISTANT_ID_GPT4_BOT2"]
OPEN_AI_ASSISTANT_ID_GPT3_BOT2 = config["OPEN_AI_ASSISTANT_ID_GPT3_BOT2"]
GROQ_API_KEY = config["GROQ_API_KEY"]
JUTOR_CHAT_KEY = config["JUTOR_CHAT_KEY"]
AWS_ACCESS_KEY = config["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = config["AWS_SECRET_KEY"]
AWS_REGION_NAME = config["AWS_REGION_NAME"]
OUTPUT_PATH = config["OUTPUT_PATH"]
else:
PASSWORD = os.getenv("PASSWORD")
GCS_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
DRIVE_KEY = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY")
OPEN_AI_ASSISTANT_ID_GPT4_BOT1 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT4_BOT1")
OPEN_AI_ASSISTANT_ID_GPT3_BOT1 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT3_BOT1")
OPEN_AI_KEY_BOT2 = os.getenv("OPEN_AI_KEY_BOT2")
OPEN_AI_ASSISTANT_ID_GPT4_BOT2 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT4_BOT2")
OPEN_AI_ASSISTANT_ID_GPT3_BOT2 = os.getenv("OPEN_AI_ASSISTANT_ID_GPT3_BOT2")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
JUTOR_CHAT_KEY = os.getenv("JUTOR_CHAT_KEY")
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION_NAME = 'us-west-2'
OUTPUT_PATH = 'videos'
TRANSCRIPTS = []
CURRENT_INDEX = 0
CHAT_LIMIT = 10
# CLIENTS CONFIG
GROQ_CLIENT = Groq(api_key=GROQ_API_KEY)
GCS_SERVICE = GoogleCloudStorage(GCS_KEY)
GCS_CLIENT = GCS_SERVICE.client
BEDROCK_CLIENT = boto3.client(
service_name="bedrock-runtime",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME,
)
# check open ai access
def check_open_ai_access(open_ai_api_key):
# set key in OpenAI client and run to check status, if it is work, return True
client = OpenAI(api_key=open_ai_api_key)
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "This is a test."},
],
)
if response.choices[0].message.content:
return True
else:
return False
except Exception as e:
print(f"Error: {str(e)}")
return False
open_ai_api_key_assistant_id_list = [
{"account":"bot1", "open_ai_api_key": OPEN_AI_KEY, "assistant_gpt4_id": OPEN_AI_ASSISTANT_ID_GPT4_BOT1, "assistant_gpt3_id": OPEN_AI_ASSISTANT_ID_GPT3_BOT1},
{"account":"bot2", "open_ai_api_key": OPEN_AI_KEY_BOT2, "assistant_gpt4_id": OPEN_AI_ASSISTANT_ID_GPT4_BOT2, "assistant_gpt3_id": OPEN_AI_ASSISTANT_ID_GPT3_BOT2},
]
for open_ai_api_key_assistant_id in open_ai_api_key_assistant_id_list:
account = open_ai_api_key_assistant_id["account"]
open_ai_api_key = open_ai_api_key_assistant_id["open_ai_api_key"]
if check_open_ai_access(open_ai_api_key):
OPEN_AI_CLIENT = OpenAI(api_key=open_ai_api_key)
OPEN_AI_ASSISTANT_ID_GPT4 = open_ai_api_key_assistant_id["assistant_gpt4_id"]
OPEN_AI_ASSISTANT_ID_GPT3 = open_ai_api_key_assistant_id["assistant_gpt3_id"]
print(f"OpenAI access is OK, account: {account}")
break
# ้ฉ—่ญ‰ password
def verify_password(password):
if password == PASSWORD:
return True
else:
raise gr.Error("ๅฏ†็ขผ้Œฏ่ชค")
# # ====drive====ๅˆๅง‹ๅŒ–
def init_drive_service():
credentials_json_string = DRIVE_KEY
credentials_dict = json.loads(credentials_json_string)
SCOPES = ['https://www.googleapis.com/auth/drive']
credentials = service_account.Credentials.from_service_account_info(
credentials_dict, scopes=SCOPES)
service = build('drive', 'v3', credentials=credentials)
return service
def create_folder_if_not_exists(service, folder_name, parent_id):
print("ๆฃ€ๆŸฅๆ˜ฏๅฆๅญ˜ๅœจ็‰นๅฎšๅ็งฐ็š„ๆ–‡ไปถๅคน๏ผŒๅฆ‚ๆžœไธๅญ˜ๅœจๅˆ™ๅˆ›ๅปบ")
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and '{parent_id}' in parents and trashed=false"
response = service.files().list(q=query, spaces='drive', fields="files(id, name)").execute()
folders = response.get('files', [])
if not folders:
# ๆ–‡ไปถๅคนไธๅญ˜ๅœจ๏ผŒๅˆ›ๅปบๆ–ฐๆ–‡ไปถๅคน
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id]
}
folder = service.files().create(body=file_metadata, fields='id').execute()
return folder.get('id')
else:
# ๆ–‡ไปถๅคนๅทฒๅญ˜ๅœจ
return folders[0]['id']
# ๆฃ€ๆŸฅGoogle DriveไธŠๆ˜ฏๅฆๅญ˜ๅœจๆ–‡ไปถ
def check_file_exists(service, folder_name, file_name):
query = f"name = '{file_name}' and '{folder_name}' in parents and trashed = false"
response = service.files().list(q=query).execute()
files = response.get('files', [])
return len(files) > 0, files[0]['id'] if files else None
def upload_content_directly(service, file_name, folder_id, content):
"""
็›ดๆŽฅๅฐ†ๅ†…ๅฎนไธŠไผ ๅˆฐGoogle Driveไธญ็š„ๆ–ฐๆ–‡ไปถใ€‚
"""
if not file_name:
raise ValueError("ๆ–‡ไปถๅไธ่ƒฝไธบ็ฉบ")
if not folder_id:
raise ValueError("ๆ–‡ไปถๅคนIDไธ่ƒฝไธบ็ฉบ")
if content is None: # ๅ…่ฎธ็ฉบๅญ—็ฌฆไธฒไธŠไผ ๏ผŒไฝ†ไธๅ…่ฎธNone
raise ValueError("ๅ†…ๅฎนไธ่ƒฝไธบ็ฉบ")
file_metadata = {'name': file_name, 'parents': [folder_id]}
# ไฝฟ็”จio.BytesIOไธบๆ–‡ๆœฌๅ†…ๅฎนๅˆ›ๅปบไธ€ไธชๅ†…ๅญ˜ไธญ็š„ๆ–‡ไปถๅฏน่ฑก
try:
with io.BytesIO(content.encode('utf-8')) as fh:
media = MediaIoBaseUpload(fh, mimetype='text/plain', resumable=True)
print("==content==")
print(content)
print("==content==")
print("==media==")
print(media)
print("==media==")
# ๆ‰ง่กŒไธŠไผ 
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
return file.get('id')
except Exception as e:
print(f"ไธŠไผ ๆ–‡ไปถๆ—ถๅ‘็”Ÿ้”™่ฏฏ: {e}")
raise # ้‡ๆ–ฐๆŠ›ๅ‡บๅผ‚ๅธธ๏ผŒ่ฐƒ็”จ่€…ๅฏไปฅๆ นๆฎ้œ€่ฆๅค„็†ๆˆ–ๅฟฝ็•ฅ
def upload_file_directly(service, file_name, folder_id, file_path):
# ไธŠๅ‚ณ .json to Google Drive
file_metadata = {'name': file_name, 'parents': [folder_id]}
media = MediaFileUpload(file_path, mimetype='application/json')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
# return file.get('id') # ่ฟ”ๅ›žๆ–‡ไปถID
return True
def upload_img_directly(service, file_name, folder_id, file_path):
file_metadata = {'name': file_name, 'parents': [folder_id]}
media = MediaFileUpload(file_path, mimetype='image/jpeg')
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
return file.get('id') # ่ฟ”ๅ›žๆ–‡ไปถID
def download_file_as_string(service, file_id):
"""
ไปŽGoogle Driveไธ‹่ฝฝๆ–‡ไปถๅนถๅฐ†ๅ…ถไฝœไธบๅญ—็ฌฆไธฒ่ฟ”ๅ›žใ€‚
"""
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
fh.seek(0)
content = fh.read().decode('utf-8')
return content
def set_public_permission(service, file_id):
service.permissions().create(
fileId=file_id,
body={"type": "anyone", "role": "reader"},
fields='id',
).execute()
def update_file_on_drive(service, file_id, file_content):
"""
ๆ›ดๆ–ฐGoogle DriveไธŠ็š„ๆ–‡ไปถๅ†…ๅฎนใ€‚
ๅ‚ๆ•ฐ:
- service: Google Drive APIๆœๅŠกๅฎžไพ‹ใ€‚
- file_id: ่ฆๆ›ดๆ–ฐ็š„ๆ–‡ไปถ็š„IDใ€‚
- file_content: ๆ–ฐ็š„ๆ–‡ไปถๅ†…ๅฎน๏ผŒๅญ—็ฌฆไธฒๆ ผๅผใ€‚
"""
# ๅฐ†ๆ–ฐ็š„ๆ–‡ไปถๅ†…ๅฎน่ฝฌๆขไธบๅญ—่Š‚ๆต
fh = io.BytesIO(file_content.encode('utf-8'))
media = MediaIoBaseUpload(fh, mimetype='application/json', resumable=True)
# ๆ›ดๆ–ฐๆ–‡ไปถ
updated_file = service.files().update(
fileId=file_id,
media_body=media
).execute()
print(f"ๆ–‡ไปถๅทฒๆ›ดๆ–ฐ๏ผŒๆ–‡ไปถID: {updated_file['id']}")
# ---- Text file ----
def process_file(password, file):
verify_password(password)
# ่ฏปๅ–ๆ–‡ไปถ
if file.name.endswith('.csv'):
df = pd.read_csv(file)
text = df_to_text(df)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file)
text = df_to_text(df)
elif file.name.endswith('.docx'):
text = docx_to_text(file)
else:
raise ValueError("Unsupported file type")
df_string = df.to_string()
# ๅฎœ่˜ญ๏ผš็งป้™ค@XX@็ฌฆๅท to |
df_string = df_string.replace("@XX@", "|")
# ๆ นๆฎไธŠไผ ็š„ๆ–‡ไปถๅ†…ๅฎน็”Ÿๆˆ้—ฎ้ข˜
questions = generate_questions(df_string)
summary = generate_summarise(df_string)
# ่ฟ”ๅ›žๆŒ‰้’ฎๆ–‡ๆœฌๅ’Œ DataFrame ๅญ—็ฌฆไธฒ
return questions[0] if len(questions) > 0 else "", \
questions[1] if len(questions) > 1 else "", \
questions[2] if len(questions) > 2 else "", \
summary, \
df_string
def df_to_text(df):
# ๅฐ† DataFrame ่ฝฌๆขไธบ็บฏๆ–‡ๆœฌ
return df.to_string()
def docx_to_text(file):
# ๅฐ† Word ๆ–‡ๆกฃ่ฝฌๆขไธบ็บฏๆ–‡ๆœฌ
doc = Document(file)
return "\n".join([para.text for para in doc.paragraphs])
# ---- YouTube link ----
def parse_time(time_str):
"""ๅฐ‡ๆ™‚้–“ๅญ—็ฌฆไธฒ 'HH:MM:SS' ๆˆ– 'MM:SS' ่ฝ‰ๆ›็‚บ timedelta ็‰ฉไปถใ€‚"""
parts = list(map(int, time_str.split(':')))
if len(parts) == 3:
hours, minutes, seconds = parts
elif len(parts) == 2:
hours = 0 # ๆฒ’ๆœ‰ๅฐๆ™‚้ƒจๅˆ†ๆ™‚๏ผŒๅฐ‡ๅฐๆ™‚่จญ็‚บ0
minutes, seconds = parts
else:
raise ValueError("ๆ™‚้–“ๆ ผๅผไธๆญฃ็ขบ๏ผŒๆ‡‰็‚บ 'HH:MM:SS' ๆˆ– 'MM:SS'")
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
def format_seconds_to_time(seconds):
"""ๅฐ†็ง’ๆ•ฐๆ ผๅผๅŒ–ไธบ ๆ—ถ:ๅˆ†:็ง’ ็š„ๅฝขๅผ"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def extract_youtube_id(url):
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc:
# ๅฏนไบŽๆ ‡ๅ‡†้“พๆŽฅ๏ผŒ่ง†้ข‘IDๅœจๆŸฅ่ฏขๅ‚ๆ•ฐ'v'ไธญ
query_params = parse_qs(parsed_url.query)
return query_params.get("v")[0] if "v" in query_params else None
elif "youtu.be" in parsed_url.netloc:
# ๅฏนไบŽ็Ÿญ้“พๆŽฅ๏ผŒ่ง†้ข‘IDๆ˜ฏ่ทฏๅพ„็š„ไธ€้ƒจๅˆ†
return parsed_url.path.lstrip('/')
else:
return None
def get_transcript_by_yt_api(video_id):
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
languages = []
for t in transcript_list:
languages.append(t.language_code)
for language in languages:
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language])
print("===transcript===")
print(transcript)
print("===transcript===")
return transcript # ๆˆๅŠŸ็ฒๅ–ๅญ—ๅน•๏ผŒ็›ดๆŽฅ่ฟ”ๅ›ž็ตๆžœ
except NoTranscriptFound:
continue # ็•ถๅ‰่ชž่จ€็š„ๅญ—ๅน•ๆฒ’ๆœ‰ๆ‰พๅˆฐ๏ผŒ็นผ็บŒๅ˜—่ฉฆไธ‹ไธ€ๅ€‹่ชž่จ€
return None # ๆ‰€ๆœ‰ๅ˜—่ฉฆ้ƒฝๅคฑๆ•—๏ผŒ่ฟ”ๅ›žNone
def generate_transcription_by_whisper(video_id):
youtube_url = f'https://www.youtube.com/watch?v={video_id}'
codec_name = "mp3"
outtmpl = f"{OUTPUT_PATH}/{video_id}.%(ext)s"
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': codec_name,
'preferredquality': '192'
}],
'outtmpl': outtmpl,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
audio_path = f"{OUTPUT_PATH}/{video_id}.{codec_name}"
full_audio = AudioSegment.from_mp3(audio_path)
max_part_duration = 10 * 60 * 1000 # 10 minutes
full_duration = len(full_audio) # in milliseconds
parts = math.ceil(full_duration / max_part_duration)
print(f"parts: {parts}")
transcription = []
for i in range(parts):
print(f"== i: {i}==")
start_time = i * max_part_duration
end_time = min((i + 1) * max_part_duration, full_duration)
print(f"time: {start_time/1000} - {end_time/1000}")
chunk = full_audio[start_time:end_time]
chunk_path = f"{OUTPUT_PATH}/{video_id}_part_{i}.{codec_name}"
chunk.export(chunk_path, format=codec_name)
try:
with open(chunk_path, "rb") as chunk_file:
response = OPEN_AI_CLIENT.audio.transcriptions.create(
model="whisper-1",
file=chunk_file,
response_format="verbose_json",
timestamp_granularities=["segment"],
prompt="Transcribe the following audio file. if content is chinese, please using 'language: zh-TW' ",
)
# Adjusting the timestamps for the chunk based on its position in the full audio
adjusted_segments = [{
'text': segment['text'],
'start': math.ceil(segment['start'] + start_time / 1000.0), # Converting milliseconds to seconds
'end': math.ceil(segment['end'] + start_time / 1000.0),
'duration': math.ceil(segment['end'] - segment['start'])
} for segment in response.segments]
transcription.extend(adjusted_segments)
except Exception as e:
print(f"Error processing chunk {i}: {str(e)}")
# Remove temporary chunk files after processing
os.remove(chunk_path)
return transcription
def get_video_duration(video_id):
yt = YouTube(f'https://www.youtube.com/watch?v={video_id}')
try:
video_duration = yt.length
except:
video_duration = None
print(f"video_duration: {video_duration}")
return video_duration
def process_transcript_and_screenshots_on_gcs(video_id):
print("====process_transcript_and_screenshots_on_gcs====")
# GCS
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
# ้€ๅญ—็จฟๆ–‡ไปถๅ
transcript_file_name = f'{video_id}_transcript.json'
transcript_blob_name = f"{video_id}/{transcript_file_name}"
# ๆฃ€ๆŸฅ้€ๅญ—็จฟๆ˜ฏๅฆๅญ˜ๅœจ
is_new_transcript = False
is_transcript_exists = GCS_SERVICE.check_file_exists(bucket_name, transcript_blob_name)
video_duration = get_video_duration(video_id)
if not is_transcript_exists:
print("้€ๅญ—็จฟๆ–‡ไปถไธๅญ˜ๅœจไบŽGCSไธญ๏ผŒ้‡ๆ–ฐๅปบ็ซ‹")
# ไปŽYouTube่Žทๅ–้€ๅญ—็จฟๅนถไธŠไผ 
try:
transcript = get_transcript_by_yt_api(video_id)
except:
# call open ai whisper
print("===call open ai whisper===")
transcript = generate_transcription_by_whisper(video_id)
if transcript:
print("ๆˆๅŠŸ็ฒๅ–ๅญ—ๅน•")
else:
print("ๆฒ’ๆœ‰ๆ‰พๅˆฐๅญ—ๅน•")
transcript = generate_transcription_by_whisper(video_id)
if video_duration:
transcript = [entry for entry in transcript if entry['start'] <= video_duration]
transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, transcript_blob_name, transcript_text)
is_new_transcript = True
else:
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("้€ๅญ—็จฟๅทฒๅญ˜ๅœจไบŽGCSไธญ")
transcript_text = GCS_SERVICE.download_as_string(bucket_name, transcript_blob_name)
transcript = json.loads(transcript_text)
if video_duration:
transcript = [entry for entry in transcript if entry['start'] <= video_duration]
# print("===็ขบ่ชๅ…ถไป–่ก็”Ÿๆ–‡ไปถ===")
# source = "gcs"
# get_questions(video_id, transcript_text, source)
# get_video_id_summary(video_id, transcript_text, source)
# get_mind_map(video_id, transcript_text, source)
# print("===็ขบ่ชๅ…ถไป–่ก็”Ÿๆ–‡ไปถ end ===")
# ่™•็†ๆˆชๅœ–
for entry in transcript:
if 'img_file_id' not in entry:
# ๆชขๆŸฅ OUTPUT_PATH ๆ˜ฏๅฆๅญ˜ๅœจ video_id.mp4
video_path = f'{OUTPUT_PATH}/{video_id}.mp4'
if not os.path.exists(video_path):
# try 5 times ๅฆ‚ๆžœ้ƒฝๅคฑๆ•—ๅฐฑ raise
for i in range(5):
try:
download_youtube_video(video_id)
break
except Exception as e:
if i == 4:
raise gr.Error(f"ไธ‹่ฝฝ่ง†้ข‘ๅคฑ่ดฅ: {str(e)}")
time.sleep(5)
# ๆˆชๅ›พ
screenshot_path = screenshot_youtube_video(video_id, entry['start'])
screenshot_blob_name = f"{video_id}/{video_id}_{entry['start']}.jpg"
img_file_id = GCS_SERVICE.upload_image_and_get_public_url(bucket_name, screenshot_blob_name, screenshot_path)
entry['img_file_id'] = img_file_id
print(f"ๆˆชๅ›พๅทฒไธŠไผ ๅˆฐGCS: {img_file_id}")
is_new_transcript = True
# ็ขบ่ชๆ˜ฏๅฆๆ›ดๆ–ฐ้€ๅญ—็จฟๆ–‡ไปถ
if is_new_transcript:
# ๆ›ดๆ–ฐ้€ๅญ—็จฟๆ–‡ไปถ
print("===ๆ›ดๆ–ฐ้€ๅญ—็จฟๆ–‡ไปถ===")
print(transcript)
print("===ๆ›ดๆ–ฐ้€ๅญ—็จฟๆ–‡ไปถ===")
updated_transcript_text = json.dumps(transcript, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, transcript_blob_name, updated_transcript_text)
print("้€ๅญ—็จฟๅทฒๆ›ดๆ–ฐ๏ผŒๅŒ…ๆ‹ฌๆˆชๅ›พ้“พๆŽฅ")
updated_transcript_json = json.loads(updated_transcript_text)
else:
updated_transcript_json = transcript
return updated_transcript_json
def process_youtube_link(password, link):
verify_password(password)
# ไฝฟ็”จ YouTube API ่Žทๅ–้€ๅญ—็จฟ
# ๅ‡่ฎพๆ‚จๅทฒ็ป่Žทๅ–ไบ† YouTube ่ง†้ข‘็š„้€ๅญ—็จฟๅนถๅญ˜ๅ‚จๅœจๅ˜้‡ `transcript` ไธญ
video_id = extract_youtube_id(link)
try:
transcript = process_transcript_and_screenshots_on_gcs(video_id)
except Exception as e:
error_msg = f" {video_id} ้€ๅญ—็จฟ้Œฏ่ชค: {str(e)}"
print("===process_youtube_link error===")
print(error_msg)
raise gr.Error(error_msg)
original_transcript = json.dumps(transcript, ensure_ascii=False, indent=2)
formatted_transcript = []
formatted_simple_transcript =[]
for entry in transcript:
start_time = format_seconds_to_time(entry['start'])
end_time = format_seconds_to_time(entry['start'] + entry['duration'])
embed_url = get_embedded_youtube_link(video_id, entry['start'])
img_file_id = entry['img_file_id']
screenshot_path = img_file_id
line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text'],
"embed_url": embed_url,
"screenshot_path": screenshot_path
}
formatted_transcript.append(line)
# formatted_simple_transcript ๅช่ฆ start_time, end_time, text
simple_line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text']
}
formatted_simple_transcript.append(simple_line)
global TRANSCRIPTS
TRANSCRIPTS = formatted_transcript
# ๅŸบไบŽ้€ๅญ—็จฟ็”Ÿๆˆๅ…ถไป–ๆ‰€้œ€็š„่พ“ๅ‡บ
source = "gcs"
questions_answers = get_questions_answers(video_id, formatted_simple_transcript, source)
questions_answers_json = json.dumps(questions_answers, ensure_ascii=False, indent=2)
summary_json = get_video_id_summary(video_id, formatted_simple_transcript, source)
summary_text = summary_json["summary"]
summary = summary_json["summary"]
key_moments_json = get_key_moments(video_id, formatted_simple_transcript, formatted_transcript, source)
key_moments = key_moments_json["key_moments"]
key_moments_text = json.dumps(key_moments, ensure_ascii=False, indent=2)
key_moments_html = get_key_moments_html(key_moments)
html_content = format_transcript_to_html(formatted_transcript)
simple_html_content = format_simple_transcript_to_html(formatted_simple_transcript)
first_image = formatted_transcript[0]['screenshot_path']
# first_image = "https://www.nameslook.com/names/dfsadf-nameslook.png"
first_text = formatted_transcript[0]['text']
mind_map_json = get_mind_map(video_id, formatted_simple_transcript, source)
mind_map = mind_map_json["mind_map"]
mind_map_html = get_mind_map_html(mind_map)
reading_passage_json = get_reading_passage(video_id, formatted_simple_transcript, source)
reading_passage_text = reading_passage_json["reading_passage"]
reading_passage = reading_passage_json["reading_passage"]
meta_data = get_meta_data(video_id)
subject = meta_data["subject"]
grade = meta_data["grade"]
# ็กฎไฟ่ฟ”ๅ›žไธŽ UI ็ป„ไปถ้ข„ๆœŸๅŒน้…็š„่พ“ๅ‡บ
return video_id, \
questions_answers_json, \
original_transcript, \
summary_text, \
summary, \
key_moments_text, \
key_moments_html, \
mind_map, \
mind_map_html, \
html_content, \
simple_html_content, \
first_image, \
first_text, \
reading_passage_text, \
reading_passage, \
subject, \
grade
def create_formatted_simple_transcript(transcript):
formatted_simple_transcript = []
for entry in transcript:
start_time = format_seconds_to_time(entry['start'])
end_time = format_seconds_to_time(entry['start'] + entry['duration'])
line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text']
}
formatted_simple_transcript.append(line)
return formatted_simple_transcript
def create_formatted_transcript(video_id, transcript):
formatted_transcript = []
for entry in transcript:
start_time = format_seconds_to_time(entry['start'])
end_time = format_seconds_to_time(entry['start'] + entry['duration'])
embed_url = get_embedded_youtube_link(video_id, entry['start'])
img_file_id = entry['img_file_id']
screenshot_path = img_file_id
line = {
"start_time": start_time,
"end_time": end_time,
"text": entry['text'],
"embed_url": embed_url,
"screenshot_path": screenshot_path
}
formatted_transcript.append(line)
return formatted_transcript
def format_transcript_to_html(formatted_transcript):
html_content = ""
for entry in formatted_transcript:
html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>"
html_content += f"<p>{entry['text']}</p>"
html_content += f"<img src='{entry['screenshot_path']}' width='500px' />"
return html_content
def format_simple_transcript_to_html(formatted_transcript):
html_content = ""
for entry in formatted_transcript:
html_content += f"<h3>{entry['start_time']} - {entry['end_time']}</h3>"
html_content += f"<p>{entry['text']}</p>"
return html_content
def get_embedded_youtube_link(video_id, start_time):
int_start_time = int(start_time)
embed_url = f"https://www.youtube.com/embed/{video_id}?start={int_start_time}&autoplay=1"
return embed_url
def download_youtube_video(youtube_id, output_path=OUTPUT_PATH):
# Construct the full YouTube URL
youtube_url = f'https://www.youtube.com/watch?v={youtube_id}'
# Create the output directory if it doesn't exist
if not os.path.exists(output_path):
os.makedirs(output_path)
# Download the video
try:
yt = YouTube(youtube_url)
video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
video_stream.download(output_path=output_path, filename=youtube_id+".mp4")
print(f"[Pytube] Video downloaded successfully: {output_path}/{youtube_id}.mp4")
except Exception as e:
ydl_opts = {
'format': "bestvideo[height<=720][ext=mp4]",
'outtmpl': os.path.join(output_path, f'{youtube_id}.mp4'), # Output filename template
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
print(f"[yt_dlp] Video downloaded successfully: {output_path}/{youtube_id}.mp4")
def screenshot_youtube_video(youtube_id, snapshot_sec):
video_path = f'{OUTPUT_PATH}/{youtube_id}.mp4'
file_name = f"{youtube_id}_{snapshot_sec}.jpg"
with VideoFileClip(video_path) as video:
screenshot_path = f'{OUTPUT_PATH}/{file_name}'
video.save_frame(screenshot_path, snapshot_sec)
return screenshot_path
# ---- Web ----
# def process_web_link(link):
# # ๆŠ“ๅ–ๅ’Œ่งฃๆž็ฝ‘้กตๅ†…ๅฎน
# response = requests.get(link)
# soup = BeautifulSoup(response.content, 'html.parser')
# return soup.get_text()
# ---- LLM Generator ----
def split_data(df_string, word_base=100000):
"""Split the JSON string based on a character length base and then chunk the parsed JSON array."""
if isinstance(df_string, str):
data_str_cnt = len(df_string)
data = json.loads(df_string)
else:
data_str_cnt = len(str(df_string))
data = df_string
# Calculate the number of parts based on the length of the string
n_parts = data_str_cnt // word_base + (1 if data_str_cnt % word_base != 0 else 0)
print(f"Number of Parts: {n_parts}")
# Calculate the number of elements each part should have
part_size = len(data) // n_parts if n_parts > 0 else len(data)
segments = []
for i in range(n_parts):
start_idx = i * part_size
end_idx = min((i + 1) * part_size, len(data))
# Serialize the segment back to a JSON string
segment = json.dumps(data[start_idx:end_idx])
segments.append(segment)
return segments
def generate_content_by_LLM(sys_content, user_content, response_format=None):
# ไฝฟ็”จ OpenAI ็”ŸๆˆๅŸบไบŽไธŠไผ ๆ•ฐๆฎ็š„้—ฎ้ข˜
try:
model = "gpt-4-turbo"
# ไฝฟ็”จ OPEN AI ็”Ÿๆˆ Reading Passage
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": model,
"messages": messages,
"max_tokens": 4000,
"response_format": response_format
}
if response_format is not None:
request_payload["response_format"] = response_format
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
content = response.choices[0].message.content.strip()
except Exception as e:
print(f"Error generating reading passage: {str(e)}")
print("using REDROCK")
# ไฝฟ็”จ REDROCK ็”Ÿๆˆ Reading Passage
messages = [
{"role": "user", "content": user_content}
]
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
# model_id = "anthropic.claude-3-haiku-20240307-v1:0"
kwargs = {
"modelId": model_id,
"contentType": "application/json",
"accept": "application/json",
"body": json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4000,
"system": sys_content,
"messages": messages
})
}
response = BEDROCK_CLIENT.invoke_model(**kwargs)
response_body = json.loads(response.get('body').read())
content = response_body.get('content')[0].get('text')
print("=====content=====")
print(content)
print("=====content=====")
return content
def get_reading_passage(video_id, df_string, source):
if source == "gcs":
print("===get_reading_passage on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_reading_passage_latex.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅ reading_passage ๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
reading_passage = generate_reading_passage(df_string)
reading_passage_json = {"reading_passage": str(reading_passage)}
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, reading_passage_text)
print("reading_passageๅทฒไธŠไผ ๅˆฐGCS")
else:
# reading_passageๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("reading_passageๅทฒๅญ˜ๅœจไบŽGCSไธญ")
reading_passage_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
reading_passage_json = json.loads(reading_passage_text)
elif source == "drive":
print("===get_reading_passage on drive===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_reading_passage.json'
# ๆฃ€ๆŸฅ reading_passage ๆ˜ฏๅฆๅญ˜ๅœจ
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
reading_passage = generate_reading_passage(df_string)
reading_passage_json = {"reading_passage": str(reading_passage)}
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, reading_passage_text)
print("reading_passageๅทฒไธŠๅ‚ณๅˆฐGoogle Drive")
else:
# reading_passageๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("reading_passageๅทฒๅญ˜ๅœจไบŽGoogle Driveไธญ")
reading_passage_text = download_file_as_string(service, file_id)
return reading_passage_json
def generate_reading_passage(df_string):
print("===generate_reading_passage===")
segments = split_data(df_string, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {segment}
ๆ–‡ๆœฌ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž
ๅนซๆˆ‘็ต„ๅˆๆˆ Reading Passage
ไธฆๆฝค็จฟ่ฎ“ๆ–‡ๅฅ้€š้ †
่ซ‹ไธ€ๅฎš่ฆไฝฟ็”จ็น้ซ”ไธญๆ–‡ zh-TW๏ผŒไธฆ็”จๅฐ็ฃไบบ็š„ๅฃ่ชž
็”ข็”Ÿ็š„็ตๆžœไธ่ฆๅ‰ๅพŒๆ–‡่งฃ้‡‹๏ผŒไนŸไธ่ฆๆ•˜่ฟฐ้€™็ฏ‡ๆ–‡็ซ ๆ€Ž้บผ็”ข็”Ÿ็š„
ๅช้œ€่ฆๅฐˆๆณจๆไพ› Reading Passage๏ผŒๅญ—ๆ•ธๅœจ 500 ๅญ—ไปฅๅ…ง
ๆ•˜่ฟฐไธญ๏ผŒ่ซ‹ๆŠŠๆ•ธๅญธๆˆ–ๆ˜ฏๅฐˆๆฅญ่ก“่ชž๏ผŒ็”จ Latex ๅŒ…่ฆ†๏ผˆ$...$๏ผ‰๏ผŒไธฆไธ”ไธ่ฆๅŽปๆ”นๅŽŸๆœฌ็š„ๆ–‡็ซ 
ๅŠ ๆธ›ไน˜้™คใ€ๆ น่™Ÿใ€ๆฌกๆ–น็ญ‰็ญ‰็š„้‹็ฎ—ๅผๅฃ่ชžไนŸๆ›ๆˆ LATEX ๆ•ธๅญธ็ฌฆ่™Ÿ
่ซ‹็›ดๆŽฅ็ตฆๅ‡บๆ–‡็ซ ๏ผŒไธ็”จไป‹็ดนๆ€Ž้บผ่™•็†็š„ๆˆ–ๆ˜ฏๆ–‡็ซ ๅญ—ๆ•ธ็ญ‰็ญ‰
"""
content = generate_content_by_LLM(sys_content, user_content)
all_content.append(content + "\n")
# ๅฐ‡ๆ‰€ๆœ‰็”Ÿๆˆ็š„้–ฑ่ฎ€็†่งฃๆฎต่ฝๅˆไฝตๆˆไธ€ๅ€‹ๅฎŒๆ•ด็š„ๆ–‡็ซ 
final_content = "\n".join(all_content)
return final_content
def text_to_speech(video_id, text):
tts = gTTS(text, lang='en')
filename = f'{video_id}_reading_passage.mp3'
tts.save(filename)
return filename
def get_mind_map(video_id, df_string, source):
if source == "gcs":
print("===get_mind_map on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_mind_map.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
mind_map = generate_mind_map(df_string)
mind_map_json = {"mind_map": str(mind_map)}
mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, mind_map_text)
print("mind_mapๅทฒไธŠๅ‚ณๅˆฐGCS")
else:
# mindmapๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("mind_mapๅทฒๅญ˜ๅœจไบŽGCSไธญ")
mind_map_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
mind_map_json = json.loads(mind_map_text)
elif source == "drive":
print("===get_mind_map on drive===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_mind_map.json'
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
mind_map = generate_mind_map(df_string)
mind_map_json = {"mind_map": str(mind_map)}
mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, mind_map_text)
print("mind_mapๅทฒไธŠๅ‚ณๅˆฐGoogle Drive")
else:
# mindmapๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("mind_mapๅทฒๅญ˜ๅœจไบŽGoogle Driveไธญ")
mind_map_text = download_file_as_string(service, file_id)
mind_map_json = json.loads(mind_map_text)
return mind_map_json
def generate_mind_map(df_string):
print("===generate_mind_map===")
segments = split_data(df_string, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {segment} ๆ–‡ๆœฌๅปบ็ซ‹ markdown ๅฟƒๆ™บๅœ–
ๆณจๆ„๏ผšไธ้œ€่ฆๅ‰ๅพŒๆ–‡ๆ•˜่ฟฐ๏ผŒ็›ดๆŽฅ็ตฆๅ‡บ markdown ๆ–‡ๆœฌๅณๅฏ
้€™ๅฐๆˆ‘ๅพˆ้‡่ฆ
"""
content = generate_content_by_LLM(sys_content, user_content)
all_content.append(content + "\n")
# ๅฐ‡ๆ‰€ๆœ‰็”Ÿๆˆ็š„้–ฑ่ฎ€็†่งฃๆฎต่ฝๅˆไฝตๆˆไธ€ๅ€‹ๅฎŒๆ•ด็š„ๆ–‡็ซ 
final_content = "\n".join(all_content)
return final_content
def get_mind_map_html(mind_map):
mind_map_markdown = mind_map.replace("```markdown", "").replace("```", "")
mind_map_html = f"""
<div class="markmap">
<script type="text/template">
{mind_map_markdown}
</script>
</div>
"""
return mind_map_html
def get_video_id_summary(video_id, df_string, source):
if source == "gcs":
print("===get_video_id_summary on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_summary_markdown.json'
summary_file_blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅ summary_file ๆ˜ฏๅฆๅญ˜ๅœจ
is_summary_file_exists = GCS_SERVICE.check_file_exists(bucket_name, summary_file_blob_name)
if not is_summary_file_exists:
meta_data = get_meta_data(video_id)
summary = generate_summarise(df_string, meta_data)
summary_json = {"summary": str(summary)}
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, summary_file_blob_name, summary_text)
print("summaryๅทฒไธŠไผ ๅˆฐGCS")
else:
# summaryๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("summaryๅทฒๅญ˜ๅœจไบŽGCSไธญ")
summary_text = GCS_SERVICE.download_as_string(bucket_name, summary_file_blob_name)
summary_json = json.loads(summary_text)
elif source == "drive":
print("===get_video_id_summary===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_summary.json'
# ๆฃ€ๆŸฅ้€ๅญ—็จฟๆ˜ฏๅฆๅญ˜ๅœจ
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
meta_data = get_meta_data(video_id)
summary = generate_summarise(df_string, meta_data)
summary_json = {"summary": str(summary)}
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
try:
upload_content_directly(service, file_name, folder_id, summary_text)
print("summaryๅทฒไธŠๅ‚ณๅˆฐGoogle Drive")
except Exception as e:
error_msg = f" {video_id} ๆ‘˜่ฆ้Œฏ่ชค: {str(e)}"
print("===get_video_id_summary error===")
print(error_msg)
print("===get_video_id_summary error===")
else:
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("summaryๅทฒๅญ˜ๅœจGoogle Driveไธญ")
summary_text = download_file_as_string(service, file_id)
summary_json = json.loads(summary_text)
return summary_json
def generate_summarise(df_string, metadata=None):
print("===generate_summarise===")
# ไฝฟ็”จ OpenAI ็”ŸๆˆๅŸบไบŽไธŠไผ ๆ•ฐๆฎ็š„้—ฎ้ข˜
if metadata:
title = metadata.get("title", "")
subject = metadata.get("subject", "")
grade = metadata.get("grade", "")
else:
title = ""
subject = ""
grade = ""
segments = split_data(df_string, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ชฒ็จ‹ๅ็จฑ๏ผš{title}
็ง‘็›ฎ๏ผš{subject}
ๅนด็ดš๏ผš{grade}
่ซ‹ๆ นๆ“šๅ…งๆ–‡๏ผš {segment}
ๆ ผๅผ็‚บ Markdown
ๅฆ‚ๆžœๆœ‰่ชฒ็จ‹ๅ็จฑ๏ผŒ่ซ‹ๅœ็นžใ€Œ่ชฒ็จ‹ๅ็จฑใ€็‚บๅญธ็ฟ’้‡้ปž๏ผŒ้€ฒ่กŒ้‡้ปžๆ•ด็†๏ผŒไธ่ฆๆ•ด็†่ทŸๆƒ…ๅขƒๆ•…ไบ‹็›ธ้—œ็š„ๅ•้กŒ
ๆ•ด้ซ”ๆ‘˜่ฆๅœจไธ€็™พๅญ—ไปฅๅ…ง
้‡้ปžๆฆ‚ๅฟตๅˆ—ๅ‡บ bullet points๏ผŒ่‡ณๅฐ‘ไธ‰ๅ€‹๏ผŒๆœ€ๅคšไบ”ๅ€‹
ไปฅๅŠๅฏ่ƒฝ็š„็ต่ซ–่ˆ‡็ตๅฐพๅปถไผธๅฐๅ•้กŒๆไพ›ๅญธ็”Ÿไฝœๅๆ€
ๆ•˜่ฟฐไธญ๏ผŒ่ซ‹ๆŠŠๆ•ธๅญธๆˆ–ๆ˜ฏๅฐˆๆฅญ่ก“่ชž๏ผŒ็”จ Latex ๅŒ…่ฆ†๏ผˆ$...$๏ผ‰
ๅŠ ๆธ›ไน˜้™คใ€ๆ น่™Ÿใ€ๆฌกๆ–น็ญ‰็ญ‰็š„้‹็ฎ—ๅผๅฃ่ชžไนŸๆ›ๆˆ LATEX ๆ•ธๅญธ็ฌฆ่™Ÿ
ๆ•ด้ซ”ๆ ผๅผ็‚บ๏ผš
## ๐ŸŒŸ ไธป้กŒ๏ผš{{title}} (ๅฆ‚ๆžœๆฒ’ๆœ‰ title ๅฐฑ็œ็•ฅ)
## ๐Ÿ“š ๆ•ด้ซ”ๆ‘˜่ฆ
- (ไธ€ๅ€‹ bullet point....)
## ๐Ÿ”– ้‡้ปžๆฆ‚ๅฟต
- xxx
- xxx
- xxx
## ๐Ÿ’ก ็‚บไป€้บผๆˆ‘ๅ€‘่ฆๅญธ้€™ๅ€‹๏ผŸ
- (ไธ€ๅ€‹ bullet point....)
## โ“ ๅปถไผธๅฐๅ•้กŒ
- (ไธ€ๅ€‹ bullet point....่ซ‹ๅœ็นžใ€Œ่ชฒ็จ‹ๅ็จฑใ€็‚บๅญธ็ฟ’้‡้ปž๏ผŒ้€ฒ่กŒ้‡้ปžๆ•ด็†๏ผŒไธ่ฆๆ•ด็†่ทŸๆƒ…ๅขƒๆ•…ไบ‹็›ธ้—œ็š„ๅ•้กŒ)
"""
content = generate_content_by_LLM(sys_content, user_content)
all_content.append(content + "\n")
if len(all_content) > 1:
all_content_cnt = len(all_content)
all_content_str = json.dumps(all_content)
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ›ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ›ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ชฒ็จ‹ๅ็จฑ๏ผš{title}
็ง‘็›ฎ๏ผš{subject}
ๅนด็ดš๏ผš{grade}
่ซ‹ๆ นๆ“šๅ…งๆ–‡๏ผš {all_content_str}
ๅ…ฑๆœ‰ {all_content_cnt} ๆฎต๏ผŒ่ซ‹็ธฑๆ•ดๆˆไธ€็ฏ‡ๆ‘˜่ฆ
ๆ ผๅผ็‚บ Markdown
ๅฆ‚ๆžœๆœ‰่ชฒ็จ‹ๅ็จฑ๏ผŒ่ซ‹ๅœ็นžใ€Œ่ชฒ็จ‹ๅ็จฑใ€็‚บๅญธ็ฟ’้‡้ปž๏ผŒ้€ฒ่กŒ้‡้ปžๆ•ด็†๏ผŒไธ่ฆๆ•ด็†่ทŸๆƒ…ๅขƒๆ•…ไบ‹็›ธ้—œ็š„ๅ•้กŒ
ๆ•ด้ซ”ๆ‘˜่ฆๅœจ {all_content_cnt} ็™พๅญ—ไปฅๅ…ง
้‡้ปžๆฆ‚ๅฟตๅˆ—ๅ‡บ bullet points๏ผŒ่‡ณๅฐ‘ไธ‰ๅ€‹๏ผŒๆœ€ๅคšๅๅ€‹
ไปฅๅŠๅฏ่ƒฝ็š„็ต่ซ–่ˆ‡็ตๅฐพๅปถไผธๅฐๅ•้กŒๆไพ›ๅญธ็”Ÿไฝœๅๆ€
ๆ•˜่ฟฐไธญ๏ผŒ่ซ‹ๆŠŠๆ•ธๅญธๆˆ–ๆ˜ฏๅฐˆๆฅญ่ก“่ชž๏ผŒ็”จ Latex ๅŒ…่ฆ†๏ผˆ$...$๏ผ‰
ๅŠ ๆธ›ไน˜้™คใ€ๆ น่™Ÿใ€ๆฌกๆ–น็ญ‰็ญ‰็š„้‹็ฎ—ๅผๅฃ่ชžไนŸๆ›ๆˆ LATEX ๆ•ธๅญธ็ฌฆ่™Ÿ
ๆ•ด้ซ”ๆ ผๅผ็‚บ๏ผš
## ๐ŸŒŸ ไธป้กŒ๏ผš{{title}} (ๅฆ‚ๆžœๆฒ’ๆœ‰ title ๅฐฑ็œ็•ฅ)
## ๐Ÿ“š ๆ•ด้ซ”ๆ‘˜่ฆ
- ( {all_content_cnt} ๅ€‹ bullet point....)
## ๐Ÿ”– ้‡้ปžๆฆ‚ๅฟต
- xxx
- xxx
- xxx
## ๐Ÿ’ก ็‚บไป€้บผๆˆ‘ๅ€‘่ฆๅญธ้€™ๅ€‹๏ผŸ
- ( {all_content_cnt} ๅ€‹ bullet point....)
## โ“ ๅปถไผธๅฐๅ•้กŒ
- ( {all_content_cnt} ๅ€‹ bullet point....่ซ‹ๅœ็นžใ€Œ่ชฒ็จ‹ๅ็จฑใ€็‚บๅญธ็ฟ’้‡้ปž๏ผŒ้€ฒ่กŒ้‡้ปžๆ•ด็†๏ผŒไธ่ฆๆ•ด็†่ทŸๆƒ…ๅขƒๆ•…ไบ‹็›ธ้—œ็š„ๅ•้กŒ)
"""
final_content = generate_content_by_LLM(sys_content, user_content)
else:
final_content = all_content[0]
return final_content
def get_questions(video_id, df_string, source="gcs"):
if source == "gcs":
# ๅŽป gcs ็ขบ่ชๆ˜ฏๆœ‰ๆœ‰ video_id_questions.json
print("===get_questions on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_questions.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_questions_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_questions_exists:
questions = generate_questions(df_string)
questions_text = json.dumps(questions, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_text)
print("questionsๅทฒไธŠๅ‚ณๅˆฐGCS")
else:
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("questionsๅทฒๅญ˜ๅœจไบŽGCSไธญ")
questions_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
questions = json.loads(questions_text)
elif source == "drive":
# ๅŽป g drive ็ขบ่ชๆ˜ฏๆœ‰ๆœ‰ video_id_questions.json
print("===get_questions===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_questions.json'
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
questions = generate_questions(df_string)
questions_text = json.dumps(questions, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, questions_text)
print("questionsๅทฒไธŠๅ‚ณๅˆฐGoogle Drive")
else:
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("questionsๅทฒๅญ˜ๅœจไบŽGoogle Driveไธญ")
questions_text = download_file_as_string(service, file_id)
questions = json.loads(questions_text)
q1 = questions[0] if len(questions) > 0 else ""
q2 = questions[1] if len(questions) > 1 else ""
q3 = questions[2] if len(questions) > 2 else ""
print("=====get_questions=====")
print(f"q1: {q1}")
print(f"q2: {q2}")
print(f"q3: {q3}")
print("=====get_questions=====")
return q1, q2, q3
def generate_questions(df_string):
print("===generate_questions===")
# ไฝฟ็”จ OpenAI ็”ŸๆˆๅŸบไบŽไธŠไผ ๆ•ฐๆฎ็š„้—ฎ้ข˜
if isinstance(df_string, str):
df_string_json = json.loads(df_string)
else:
df_string_json = df_string
content_text = ""
for entry in df_string_json:
content_text += entry["text"] + "๏ผŒ"
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไธฆ็”จๆ—ขๆœ‰่ณ‡ๆ–™็‚บๆœฌ่ณช็Œœๆธฌ็”จๆˆถๅฏ่ƒฝๆœƒๅ•็š„ๅ•้กŒ๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {content_text} ็”Ÿๆˆไธ‰ๅ€‹ๅ•้กŒ๏ผŒไธฆ็”จ JSON ๆ ผๅผ่ฟ”ๅ›ž
ไธ€ๅฎš่ฆไฝฟ็”จ zh-TW๏ผŒ้€™้žๅธธ้‡่ฆ๏ผ
EXAMPLE:
{{
questions:
[q1็š„ๆ•˜่ฟฐtext, q2็š„ๆ•˜่ฟฐtext, q3็š„ๆ•˜่ฟฐtext]
}}
"""
try:
model = "gpt-4-turbo"
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
response_format = { "type": "json_object" }
print("=====messages=====")
print(messages)
print("=====messages=====")
request_payload = {
"model": model,
"messages": messages,
"max_tokens": 4000,
"response_format": response_format
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
questions = json.loads(response.choices[0].message.content)["questions"]
except:
messages = [
{"role": "user", "content": user_content}
]
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
# model_id = "anthropic.claude-3-haiku-20240307-v1:0"
kwargs = {
"modelId": model_id,
"contentType": "application/json",
"accept": "application/json",
"body": json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4000,
"system": sys_content,
"messages": messages
})
}
response = BEDROCK_CLIENT.invoke_model(**kwargs)
response_body = json.loads(response.get('body').read())
response_completion = response_body.get('content')[0].get('text')
questions = json.loads(response_completion)["questions"]
print("=====json_response=====")
print(questions)
print("=====json_response=====")
return questions
def get_questions_answers(video_id, df_string, source="gcs"):
if source == "gcs":
try:
print("===get_questions_answers on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_questions_answers.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_questions_answers_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_questions_answers_exists:
questions_answers = generate_questions_answers(df_string)
questions_answers_text = json.dumps(questions_answers, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_answers_text)
print("questions_answersๅทฒไธŠๅ‚ณๅˆฐGCS")
else:
# questions_answersๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("questions_answersๅทฒๅญ˜ๅœจไบŽGCSไธญ")
questions_answers_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
questions_answers = json.loads(questions_answers_text)
except Exception as e:
print(f"Error getting questions_answers: {str(e)}")
questions = get_questions(video_id, df_string, source)
questions_answers = [{"question": q, "answer": ""} for q in questions]
return questions_answers
def generate_questions_answers(df_string):
print("===generate_questions_answers===")
segments = split_data(df_string, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {segment} ็”Ÿๆˆไธ‰ๅ€‹ๅ•้กŒ่ทŸ็ญ”ๆกˆ๏ผŒไธป่ฆ่ˆ‡ๅญธ็ง‘ๆœ‰้—œ๏ผŒไธ่ฆๅ•่ทŸๆƒ…็ฏ€ๆ•…ไบ‹็›ธ้—œ็š„ๅ•้กŒ
็ญ”ๆกˆ่ฆๅœจๆœ€ๅพŒๆจ™็คบๅ‡บ่™•ใ€ๅƒ่€ƒ๏ผš00:01:05ใ€‘๏ผŒ่ซ‹ๆ นๆ“šๆ™‚้–“่ปธ start_time ไพ†ๆจ™็คบ
่ซ‹็ขบไฟๅ•้กŒ่ทŸ็ญ”ๆกˆ้ƒฝๆ˜ฏ็น้ซ”ไธญๆ–‡ zh-TW
็ญ”ๆกˆไธ็”จๆ˜ฏๆจ™ๆบ–็ญ”ๆกˆ๏ผŒ่€Œๆ˜ฏๅธถๆœ‰ๅ•Ÿ็™ผๆ€ง็š„่˜‡ๆ ผๆ‹‰ๅบ•ๅผๅ•็ญ”๏ผŒ่ฎ“ๅญธ็”Ÿๆ€่€ƒๆœฌไพ†็š„ๅ•้กŒ๏ผŒไปฅๅŠ่ฉฒๅŽปๅƒ่€ƒ็š„ๆ™‚้–“้ปž
ไธฆ็”จ JSON ๆ ผๅผ่ฟ”ๅ›ž list ๏ผŒ่ซ‹ไธ€ๅฎš่ฆ็ตฆไธ‰ๅ€‹ๅ•้กŒ่ทŸ็ญ”ๆกˆ๏ผŒไธ”่ฆ่ฃๅœจไธ€ๅ€‹ list ่ฃก้ข
k-v pair ็š„ key ๆ˜ฏ question, value ๆ˜ฏ answer
EXAMPLE:
{{
"questions_answers":
[
{{question: q1็š„ๆ•˜่ฟฐtext, answer: q1็š„็ญ”ๆกˆtextใ€ๅƒ่€ƒ๏ผš00:01:05ใ€‘}},
{{question: q2็š„ๆ•˜่ฟฐtext, answer: q2็š„็ญ”ๆกˆtextใ€ๅƒ่€ƒ๏ผš00:32:05ใ€‘}},
{{question: q3็š„ๆ•˜่ฟฐtext, answer: q3็š„็ญ”ๆกˆtextใ€ๅƒ่€ƒ๏ผš01:03:35ใ€‘}}
]
}}
"""
response_format = { "type": "json_object" }
content = generate_content_by_LLM(sys_content, user_content, response_format)
content_json = json.loads(content)["questions_answers"]
all_content += content_json
print("=====all_content=====")
print(all_content)
print("=====all_content=====")
return all_content
def change_questions(password, df_string):
verify_password(password)
questions = generate_questions(df_string)
q1 = questions[0] if len(questions) > 0 else ""
q2 = questions[1] if len(questions) > 1 else ""
q3 = questions[2] if len(questions) > 2 else ""
print("=====get_questions=====")
print(f"q1: {q1}")
print(f"q2: {q2}")
print(f"q3: {q3}")
print("=====get_questions=====")
return q1, q2, q3
def get_key_moments(video_id, formatted_simple_transcript, formatted_transcript, source):
if source == "gcs":
print("===get_key_moments on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_key_moments.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_key_moments_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_key_moments_exists:
key_moments = generate_key_moments(formatted_simple_transcript, formatted_transcript)
key_moments_json = {"key_moments": key_moments}
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text)
print("key_momentsๅทฒไธŠๅ‚ณๅˆฐGCS")
else:
# key_momentsๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("key_momentsๅทฒๅญ˜ๅœจไบŽGCSไธญ")
key_moments_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
key_moments_json = json.loads(key_moments_text)
# ๆชขๆŸฅ key_moments ๆ˜ฏๅฆๆœ‰ keywords
print("===ๆชขๆŸฅ key_moments ๆ˜ฏๅฆๆœ‰ keywords===")
has_keywords_added = False
for key_moment in key_moments_json["key_moments"]:
if "keywords" not in key_moment:
transcript = key_moment["transcript"]
key_moment["keywords"] = generate_key_moments_keywords(transcript)
print("===keywords===")
print(key_moment["keywords"])
print("===keywords===")
has_keywords_added = True
if has_keywords_added:
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text)
key_moments_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
key_moments_json = json.loads(key_moments_text)
elif source == "drive":
print("===get_key_moments on drive===")
service = init_drive_service()
parent_folder_id = '1GgI4YVs0KckwStVQkLa1NZ8IpaEMurkL'
folder_id = create_folder_if_not_exists(service, video_id, parent_folder_id)
file_name = f'{video_id}_key_moments.json'
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
exists, file_id = check_file_exists(service, folder_id, file_name)
if not exists:
key_moments = generate_key_moments(formatted_simple_transcript, formatted_transcript)
key_moments_json = {"key_moments": key_moments}
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2)
upload_content_directly(service, file_name, folder_id, key_moments_text)
print("key_momentsๅทฒไธŠๅ‚ณๅˆฐGoogle Drive")
else:
# key_momentsๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("key_momentsๅทฒๅญ˜ๅœจไบŽGoogle Driveไธญ")
key_moments_text = download_file_as_string(service, file_id)
key_moments_json = json.loads(key_moments_text)
return key_moments_json
def generate_key_moments(formatted_simple_transcript, formatted_transcript):
print("===generate_key_moments===")
# ไฝฟ็”จ OpenAI ็”ŸๆˆๅŸบไบŽไธŠไผ ๆ•ฐๆฎ็š„้—ฎ้ข˜
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {formatted_simple_transcript} ๆ–‡ๆœฌ๏ผŒๆๅ–ๅ‡บ้‡้ปžๆ‘˜่ฆ๏ผŒไธฆ็ตฆๅ‡บๅฐๆ‡‰็š„ๆ™‚้–“่ปธ
1. ๅฐ็ฏ„ๅœๅˆ‡ๅ‡บไธๅŒๆฎต่ฝ็š„็›ธๅฐๆ‡‰ๆ™‚้–“่ปธ็š„้‡้ปžๆ‘˜่ฆ๏ผŒ
2. ๆฏไธ€ๅฐๆฎตๆœ€ๅคšไธ่ถ…้Ž 1/5 ็š„็ธฝๅ…งๅฎน๏ผŒไนŸๅฐฑๆ˜ฏๅคง็ด„ 3~5ๆฎต็š„้‡้ปž๏ผˆไพ‹ๅฆ‚ไบ”๏ฝžๅๅˆ†้˜็š„ๅฝฑ็‰‡ๅฐฑไธ€ๆฎตๅคง็ด„1~2ๅˆ†้˜๏ผŒๆœ€ๅคšไธ‰ๅˆ†้˜๏ผŒไฝ†ๅฆ‚ๆžœๆ˜ฏ่ถ…้Žๅๅˆ†้˜็š„ๅฝฑ็‰‡๏ผŒ้‚ฃไธ€ๅฐๆฎตๅคง็ด„ 2~3ๅˆ†้˜๏ผŒไปฅๆญค้กžๆŽจ๏ผ‰
3. ๆณจๆ„ไธ่ฆ้บๆผไปปไฝ•ไธ€ๆฎตๆ™‚้–“่ปธ็š„ๅ…งๅฎน ๅพž้›ถ็ง’้–‹ๅง‹
4. ๅฆ‚ๆžœ้ ญๅฐพ็š„ๆƒ…็ฏ€ไธๆ˜ฏ้‡้ปž๏ผŒ็‰นๅˆฅๆ˜ฏๆ‰“ๆ‹›ๅ‘ผๆˆ–ๆ˜ฏไป‹็ดนไบบ็‰ฉใ€ๆˆ–ๆ˜ฏsay goodbye ๅฐฑๆ˜ฏไธ้‡่ฆ็š„ๆƒ…็ฏ€๏ผŒๅฐฑไธ็”จๆ“ทๅ–
5. ไปฅ้€™็จฎๆ–นๅผๅˆ†ๆžๆ•ดๅ€‹ๆ–‡ๆœฌ๏ผŒๅพž้›ถ็ง’้–‹ๅง‹ๅˆ†ๆž๏ผŒ็›ดๅˆฐ็ตๆŸใ€‚้€™ๅพˆ้‡่ฆ
6. ้—œ้ตๅญ—ๅพžtranscript extract to keyword๏ผŒไฟ็•™ๅฐˆๅฎถๅๅญ—ใ€ๅฐˆๆฅญ่ก“่ชžใ€ๅนดไปฝใ€ๆ•ธๅญ—ใ€ๆœŸๅˆŠๅ็จฑใ€ๅœฐๅใ€ๆ•ธๅญธๅ…ฌๅผ
7. text, keywords please use or transfer to zh-TW, it's very important
Example: retrun JSON
{{key_moments:[{{
"start": "00:00",
"end": "01:00",
"text": "้€ๅญ—็จฟ็š„้‡้ปžๆ‘˜่ฆ",
"keywords": ["้—œ้ตๅญ—", "้—œ้ตๅญ—"]
}}]
}}
"""
segments = split_data(formatted_simple_transcript, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {segment} ๆ–‡ๆœฌ๏ผŒๆๅ–ๅ‡บ้‡้ปžๆ‘˜่ฆ๏ผŒไธฆ็ตฆๅ‡บๅฐๆ‡‰็š„ๆ™‚้–“่ปธ
1. ๅฐ็ฏ„ๅœๅˆ‡ๅ‡บไธๅŒๆฎต่ฝ็š„็›ธๅฐๆ‡‰ๆ™‚้–“่ปธ็š„้‡้ปžๆ‘˜่ฆ๏ผŒ
2. ๆฏไธ€ๅฐๆฎตๆœ€ๅคšไธ่ถ…้Ž 1/5 ็š„็ธฝๅ…งๅฎน๏ผŒไนŸๅฐฑๆ˜ฏๅคง็ด„ 3~5ๆฎต็š„้‡้ปž๏ผˆไพ‹ๅฆ‚ไบ”๏ฝžๅๅˆ†้˜็š„ๅฝฑ็‰‡ๅฐฑไธ€ๆฎตๅคง็ด„1~2ๅˆ†้˜๏ผŒๆœ€ๅคšไธ‰ๅˆ†้˜๏ผŒไฝ†ๅฆ‚ๆžœๆ˜ฏ่ถ…้Žๅๅˆ†้˜็š„ๅฝฑ็‰‡๏ผŒ้‚ฃไธ€ๅฐๆฎตๅคง็ด„ 2~3ๅˆ†้˜๏ผŒไปฅๆญค้กžๆŽจ๏ผ‰
3. ๆณจๆ„ไธ่ฆ้บๆผไปปไฝ•ไธ€ๆฎตๆ™‚้–“่ปธ็š„ๅ…งๅฎน ๅพž้›ถ็ง’้–‹ๅง‹
4. ๅฆ‚ๆžœ้ ญๅฐพ็š„ๆƒ…็ฏ€ไธๆ˜ฏ้‡้ปž๏ผŒ็‰นๅˆฅๆ˜ฏๆ‰“ๆ‹›ๅ‘ผๆˆ–ๆ˜ฏไป‹็ดนไบบ็‰ฉใ€ๆˆ–ๆ˜ฏsay goodbye ๅฐฑๆ˜ฏไธ้‡่ฆ็š„ๆƒ…็ฏ€๏ผŒๅฐฑไธ็”จๆ“ทๅ–
5. ไปฅ้€™็จฎๆ–นๅผๅˆ†ๆžๆ•ดๅ€‹ๆ–‡ๆœฌ๏ผŒๅพž้›ถ็ง’้–‹ๅง‹ๅˆ†ๆž๏ผŒ็›ดๅˆฐ็ตๆŸใ€‚้€™ๅพˆ้‡่ฆ
6. ้—œ้ตๅญ—ๅพžtranscript extract to keyword๏ผŒไฟ็•™ๅฐˆๅฎถๅๅญ—ใ€ๅฐˆๆฅญ่ก“่ชžใ€ๅนดไปฝใ€ๆ•ธๅญ—ใ€ๆœŸๅˆŠๅ็จฑใ€ๅœฐๅใ€ๆ•ธๅญธๅ…ฌๅผ
7. text, keywords please use or transfer zh-TW, it's very important
Example: retrun JSON
{{key_moments:[{{
"start": "00:00",
"end": "01:00",
"text": "้€ๅญ—็จฟ็š„้‡้ปžๆ‘˜่ฆ",
"keywords": ["้—œ้ตๅญ—", "้—œ้ตๅญ—"]
}}]
}}
"""
response_format = { "type": "json_object" }
content = generate_content_by_LLM(sys_content, user_content, response_format)
key_moments = json.loads(content)["key_moments"]
# "transcript": get text from formatted_simple_transcript
for moment in key_moments:
start_time = parse_time(moment['start'])
end_time = parse_time(moment['end'])
# ไฝฟ็”จ่ฝ‰ๆ›ๅพŒ็š„ timedelta ็‰ฉไปถ้€ฒ่กŒๆ™‚้–“
moment['transcript'] = "๏ผŒ".join([entry['text'] for entry in formatted_simple_transcript
if start_time <= parse_time(entry['start_time']) <= end_time])
print("=====key_moments=====")
print(key_moments)
print("=====key_moments=====")
image_links = {entry['start_time']: entry['screenshot_path'] for entry in formatted_transcript}
for moment in key_moments:
start_time = parse_time(moment['start'])
end_time = parse_time(moment['end'])
# ไฝฟ็”จ่ฝ‰ๆ›ๅพŒ็š„ timedelta ็‰ฉไปถ้€ฒ่กŒๆ™‚้–“ๆฏ”่ผƒ
moment_images = [image_links[time] for time in image_links
if start_time <= parse_time(time) <= end_time]
moment['images'] = moment_images
all_content += key_moments
return all_content
def generate_key_moments_keywords(transcript):
print("===generate_key_moments_keywords===")
segments = split_data(transcript, word_base=100000)
all_content = []
for segment in segments:
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
transcript extract to keyword
ไฟ็•™ๅฐˆๅฎถๅๅญ—ใ€ๅฐˆๆฅญ่ก“่ชžใ€ๅนดไปฝใ€ๆ•ธๅญ—ใ€ๆœŸๅˆŠๅ็จฑใ€ๅœฐๅใ€ๆ•ธๅญธๅ…ฌๅผใ€ๆ•ธๅญธ่กจ็คบๅผใ€็‰ฉ็†ๅŒ–ๅญธ็ฌฆ่™Ÿ๏ผŒ
ไธ็”จ็ตฆไธŠไธ‹ๆ–‡๏ผŒ็›ดๆŽฅ็ตฆๅ‡บ้—œ้ตๅญ—๏ผŒไฝฟ็”จ zh-TW๏ผŒ็”จ้€—่™Ÿๅˆ†้š”๏ผŒ example: ้—œ้ตๅญ—1, ้—œ้ตๅญ—2
transcript๏ผš{segment}
"""
content = generate_content_by_LLM(sys_content, user_content)
keywords = content.strip().split(",")
all_content += keywords
return all_content
def get_key_moments_html(key_moments):
css = """
<style>
#gallery-main {
display: flex;
align-items: center;
margin-bottom: 20px;
}
#gallery {
position: relative;
width: 50%;
flex: 1;
}
#text-content {
flex: 2;
margin-left: 20px;
}
#gallery #gallery-container{
position: relative;
width: 100%;
height: 0px;
padding-bottom: 56.7%; /* 16/9 ratio */
background-color: blue;
}
#gallery #gallery-container #gallery-content{
position: absolute;
top: 0px;
right: 0px;
bottom: 0px;
left: 0px;
height: 100%;
display: flex;
scroll-snap-type: x mandatory;
overflow-x: scroll;
scroll-behavior: smooth;
}
#gallery #gallery-container #gallery-content .gallery__item{
width: 100%;
height: 100%;
flex-shrink: 0;
scroll-snap-align: start;
scroll-snap-stop: always;
position: relative;
}
#gallery #gallery-container #gallery-content .gallery__item img{
display: block;
width: 100%;
height: 100%;
object-fit: contain;
background-color: white;
}
.click-zone{
position: absolute;
width: 20%;
height: 100%;
z-index: 3;
}
.click-zone.click-zone-prev{
left: 0px;
}
.click-zone.click-zone-next{
right: 0px;
}
#gallery:not(:hover) .arrow{
opacity: 0.8;
}
.arrow{
text-align: center;
z-index: 3;
position: absolute;
display: block;
width: 25px;
height: 25px;
line-height: 25px;
background-color: black;
border-radius: 50%;
text-decoration: none;
color: white !important;
opacity: 0.8;
transition: opacity 200ms ease;
}
.arrow:hover{
opacity: 1;
}
.arrow span{
position: relative;
top: 2px;
}
.arrow.arrow-prev{
top: 50%;
left: 5px;
}
.arrow.arrow-next{
top: 50%;
right: 5px;
}
.arrow.arrow-disabled{
opacity:0.8;
}
#text-content {
padding: 0px 36px;
}
#text-content p {
margin-top: 10px;
}
body{
font-family: sans-serif;
margin: 0px;
padding: 0px;
}
main{
padding: 0px;
margin: 0px;
max-width: 900px;
margin: auto;
}
.hidden{
border: 0;
clip: rect(0 0 0 0);
height: 1px;
margin: -1px;
overflow: hidden;
padding: 0;
position: absolute;
width: 1px;
}
@media (max-width: 768px) {
#gallery-main {
flex-direction: column; /* ๅœจๅฐๅฑๅน•ไธŠๅ †ๅ ๅ…ƒ็ด  */
}
#gallery {
width: 100%; /* ่ฎฉ็”ปๅปŠๅ ๆปกๆ•ดไธชๅฎนๅ™จๅฎฝๅบฆ */
}
#text-content {
margin-left: 0; /* ็งป้™คๅทฆ่พน่ท๏ผŒ่ฎฉๆ–‡ๆœฌๅ†…ๅฎนๅ ๆปกๅฎฝๅบฆ */
margin-top: 20px; /* ไธบๆ–‡ๆœฌๅ†…ๅฎนๆทปๅŠ ้กถ้ƒจ้—ด่ท */
}
#gallery #gallery-container {
height: 350px; /* ๆˆ–่€…ไฝ ๅฏไปฅ่ฎพ็ฝฎไธ€ไธชๅ›บๅฎš็š„้ซ˜ๅบฆ๏ผŒ่€Œไธๆ˜ฏ็”จ padding-bottom */
padding-bottom: 0; /* ็งป้™คๅบ•้ƒจๅกซๅ…… */
}
}
</style>
"""
key_moments_html = css
for i, moment in enumerate(key_moments):
images = moment['images']
image_elements = ""
for j, image in enumerate(images):
current_id = f"img_{i}_{j}"
prev_id = f"img_{i}_{j-1}" if j-1 >= 0 else f"img_{i}_{len(images)-1}"
next_id = f"img_{i}_{j+1}" if j+1 < len(images) else f"img_{i}_0"
image_elements += f"""
<div id="{current_id}" class="gallery__item">
<a href="#{prev_id}" class="click-zone click-zone-prev">
<div class="arrow arrow-disabled arrow-prev"> โ—€๏ธŽ </div>
</a>
<a href="#{next_id}" class="click-zone click-zone-next">
<div class="arrow arrow-next"> โ–ถ๏ธŽ </div>
</a>
<img src="{image}">
</div>
"""
gallery_content = f"""
<div id="gallery-content">
{image_elements}
</div>
"""
key_moments_html += f"""
<div class="gallery-container" id="gallery-main">
<div id="gallery"><!-- gallery start -->
<div id="gallery-container">
{gallery_content}
</div>
</div>
<div id="text-content">
<h3>{moment['start']} - {moment['end']}</h3>
<p><strong>ๆ‘˜่ฆ: {moment['text']} </strong></p>
<p>ๅ…งๅฎน: {moment['transcript']}</p>
</div>
</div>
"""
return key_moments_html
# ---- LLM CRUD ----
def get_LLM_content(video_id, kind):
print(f"===get_{kind}===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_{kind}.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅ file ๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if is_file_exists:
content = GCS_SERVICE.download_as_string(bucket_name, blob_name)
content_json = json.loads(content)
if kind == "reading_passage_latex":
content_text = content_json["reading_passage"]
elif kind == "summary_markdown":
content_text = content_json["summary"]
else:
content_text = json.dumps(content_json, ensure_ascii=False, indent=2)
else:
content_text = ""
return content_text
def enable_edit_mode():
return gr.update(interactive=True)
def delete_LLM_content(video_id, kind):
print(f"===delete_{kind}===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_{kind}.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅ file ๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if is_file_exists:
GCS_SERVICE.delete_blob(bucket_name, blob_name)
print(f"{file_name}ๅทฒไปŽGCSไธญๅˆ ้™ค")
return gr.update(value="", interactive=False)
def update_LLM_content(video_id, new_content, kind):
print(f"===upfdate kind on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_{kind}.json'
blob_name = f"{video_id}/{file_name}"
if kind == "reading_passage_latex":
print("=========reading_passage=======")
print(new_content)
reading_passage_json = {"reading_passage": str(new_content)}
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, reading_passage_text)
updated_content = new_content
elif kind == "summary_markdown":
summary_json = {"summary": str(new_content)}
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, summary_text)
updated_content = new_content
elif kind == "mind_map":
mind_map_json = {"mind_map": str(new_content)}
mind_map_text = json.dumps(mind_map_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, mind_map_text)
updated_content = mind_map_text
elif kind == "key_moments":
# from update_LLM_btn -> new_content is a string
# create_LLM_content -> new_content is a list
if isinstance(new_content, str):
key_moments_list = json.loads(new_content)
else:
key_moments_list = new_content
key_moments_json = {"key_moments": key_moments_list}
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, key_moments_text)
updated_content = key_moments_text
elif kind == "transcript":
if isinstance(new_content, str):
transcript_json = json.loads(new_content)
else:
transcript_json = new_content
transcript_text = json.dumps(transcript_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, transcript_text)
updated_content = transcript_text
elif kind == "questions":
# from update_LLM_btn -> new_content is a string
# create_LLM_content -> new_content is a list
if isinstance(new_content, str):
questions_json = json.loads(new_content)
else:
questions_json = new_content
questions_text = json.dumps(questions_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_text)
updated_content = questions_text
elif kind == "questions_answers":
# from update_LLM_btn -> new_content is a string
# create_LLM_content -> new_content is a list
if isinstance(new_content, str):
questions_answers_json = json.loads(new_content)
else:
questions_answers_json = new_content
questions_answers_text = json.dumps(questions_answers_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, questions_answers_text)
updated_content = questions_answers_text
elif kind == "ai_content_list":
if isinstance(new_content, str):
ai_content_json = json.loads(new_content)
else:
ai_content_json = new_content
ai_content_text = json.dumps(ai_content_json, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text)
updated_content = ai_content_text
print(f"{kind} ๅทฒๆ›ดๆ–ฐๅˆฐGCS")
return gr.update(value=updated_content, interactive=False)
def create_LLM_content(video_id, df_string, kind):
print(f"===create_{kind}===")
print(f"video_id: {video_id}")
if kind == "reading_passage_latex":
content = generate_reading_passage(df_string)
update_LLM_content(video_id, content, kind)
elif kind == "summary_markdown":
meta_data = get_meta_data(video_id)
content = generate_summarise(df_string, meta_data)
update_LLM_content(video_id, content, kind)
elif kind == "mind_map":
content = generate_mind_map(df_string)
update_LLM_content(video_id, content, kind)
elif kind == "key_moments":
if isinstance(df_string, str):
transcript = json.loads(df_string)
else:
transcript = df_string
formatted_simple_transcript = create_formatted_simple_transcript(transcript)
formatted_transcript = create_formatted_transcript(video_id, transcript)
gen_content = generate_key_moments(formatted_simple_transcript, formatted_transcript)
update_LLM_content(video_id, gen_content, kind)
content = json.dumps(gen_content, ensure_ascii=False, indent=2)
elif kind == "transcript":
gen_content = process_transcript_and_screenshots_on_gcs(video_id)
update_LLM_content(video_id, gen_content, kind)
content = json.dumps(gen_content, ensure_ascii=False, indent=2)
elif kind == "questions":
gen_content = generate_questions(df_string)
update_LLM_content(video_id, gen_content, kind)
content = json.dumps(gen_content, ensure_ascii=False, indent=2)
elif kind == "questions_answers":
if isinstance(df_string, str):
transcript = json.loads(df_string)
else:
transcript = df_string
formatted_simple_transcript = create_formatted_simple_transcript(transcript)
gen_content = generate_questions_answers(formatted_simple_transcript)
update_LLM_content(video_id, gen_content, kind)
content = json.dumps(gen_content, ensure_ascii=False, indent=2)
return gr.update(value=content, interactive=False)
# ---- LLM refresh CRUD ----
def reading_passage_add_latex_version(video_id):
# ็ขบ่ช GCS ๆ˜ฏๅฆๆœ‰ reading_passage.json
print("===reading_passage_convert_to_latex===")
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_reading_passage.json'
blob_name = f"{video_id}/{file_name}"
print(f"blob_name: {blob_name}")
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
raise gr.Error("reading_passage ไธๅญ˜ๅœจ!")
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("reading_passage ๅทฒๅญ˜ๅœจไบŽGCSไธญ๏ผŒ่ฝ‰ๆ› Latex ๆจกๅผ")
reading_passage_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
reading_passage_json = json.loads(reading_passage_text)
original_reading_passage = reading_passage_json["reading_passage"]
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {original_reading_passage}
ๆ•˜่ฟฐไธญ๏ผŒ่ซ‹ๆŠŠๆ•ธๅญธๆˆ–ๆ˜ฏๅฐˆๆฅญ่ก“่ชž๏ผŒ็”จ Latex ๅŒ…่ฆ†๏ผˆ$...$๏ผ‰๏ผŒ็›ก้‡ไธ่ฆๅŽปๆ”นๅŽŸๆœฌ็š„ๆ–‡็ซ 
ๅŠ ๆธ›ไน˜้™คใ€ๆ น่™Ÿใ€ๆฌกๆ–นใ€ๅŒ–ๅญธ็ฌฆ่™Ÿใ€็‰ฉ็†็ฌฆ่™Ÿ็ญ‰็ญ‰็š„้‹็ฎ—ๅผๅฃ่ชžไนŸๆ›ๆˆ LATEX ็ฌฆ่™Ÿ
่ซ‹ไธ€ๅฎš่ฆไฝฟ็”จ็น้ซ”ไธญๆ–‡ zh-TW๏ผŒไธฆ็”จๅฐ็ฃไบบ็š„ๅฃ่ชž
็”ข็”Ÿ็š„็ตๆžœไธ่ฆๅ‰ๅพŒๆ–‡่งฃ้‡‹๏ผŒไนŸไธ่ฆๆ•˜่ฟฐ้€™็ฏ‡ๆ–‡็ซ ๆ€Ž้บผ็”ข็”Ÿ็š„
ๅช้œ€่ฆๅฐˆๆณจๆไพ› Reading Passage๏ผŒๅญ—ๆ•ธๅœจ 200~500 ๅญ—ไปฅๅ…ง
"""
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": "gpt-4-turbo",
"messages": messages,
"max_tokens": 4000,
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
new_reading_passage = response.choices[0].message.content.strip()
print("=====new_reading_passage=====")
print(new_reading_passage)
print("=====new_reading_passage=====")
reading_passage_json["reading_passage"] = new_reading_passage
reading_passage_text = json.dumps(reading_passage_json, ensure_ascii=False, indent=2)
# ๅฆๅญ˜็‚บ reading_passage_latex.json
new_file_name = f'{video_id}_reading_passage_latex.json'
new_blob_name = f"{video_id}/{new_file_name}"
GCS_SERVICE.upload_json_string(bucket_name, new_blob_name, reading_passage_text)
return new_reading_passage
def summary_add_markdown_version(video_id):
# ็ขบ่ช GCS ๆ˜ฏๅฆๆœ‰ summary.json
print("===summary_convert_to_markdown===")
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_summary.json'
blob_name = f"{video_id}/{file_name}"
print(f"blob_name: {blob_name}")
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
raise gr.Error("summary ไธๅญ˜ๅœจ!")
# ้€ๅญ—็จฟๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝ้€ๅญ—็จฟๅ†…ๅฎน
print("summary ๅทฒๅญ˜ๅœจไบŽGCSไธญ๏ผŒ่ฝ‰ๆ› Markdown ๆจกๅผ")
summary_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
summary_json = json.loads(summary_text)
original_summary = summary_json["summary"]
sys_content = "ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ท่ณ‡ๆ–™ๅˆ†ๆž่ทŸๅฝฑ็‰‡ๆ•™ๅญธ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ๏ผŒ่ซ‹็ฒพ่ฎ€่ณ‡ๆ–™ๆ–‡ๆœฌ๏ผŒ่‡ช่กŒๅˆคๆ–ท่ณ‡ๆ–™็š„็จฎ้กž๏ผŒไฝฟ็”จ zh-TW"
user_content = f"""
่ซ‹ๆ นๆ“š {original_summary}
่ฝ‰ๆ›ๆ ผๅผ็‚บ Markdown
ๅชไฟ็•™๏ผš๐Ÿ“š ๆ•ด้ซ”ๆ‘˜่ฆใ€๐Ÿ”– ้‡้ปžๆฆ‚ๅฟตใ€๐Ÿ’ก ็‚บไป€้บผๆˆ‘ๅ€‘่ฆๅญธ้€™ๅ€‹ใ€โ“ ๅปถไผธๅฐๅ•้กŒ
ๅ…ถไป–็š„ไธ่ฆไฟ็•™
ๆ•ด้ซ”ๆ‘˜่ฆๅœจไธ€็™พๅญ—ไปฅๅ…ง
้‡้ปžๆฆ‚ๅฟต่ฝ‰ๆˆ bullet points
ไปฅๅŠๅฏ่ƒฝ็š„็ต่ซ–่ˆ‡็ตๅฐพๅปถไผธๅฐๅ•้กŒๆไพ›ๅญธ็”Ÿไฝœๅๆ€
ๆ•˜่ฟฐไธญ๏ผŒ่ซ‹ๆŠŠๆ•ธๅญธๆˆ–ๆ˜ฏๅฐˆๆฅญ่ก“่ชž๏ผŒ็”จ Latex ๅŒ…่ฆ†๏ผˆ$...$๏ผ‰
ๅŠ ๆธ›ไน˜้™คใ€ๆ น่™Ÿใ€ๆฌกๆ–น็ญ‰็ญ‰็š„้‹็ฎ—ๅผๅฃ่ชžไนŸๆ›ๆˆ LATEX ๆ•ธๅญธ็ฌฆ่™Ÿ
ๆ•ด้ซ”ๆ ผๅผ็‚บ๏ผš
## ๐Ÿ“š ๆ•ด้ซ”ๆ‘˜่ฆ
- (ไธ€ๅ€‹ bullet point....)
## ๐Ÿ”– ้‡้ปžๆฆ‚ๅฟต
- xxx
- xxx
- xxx
## ๐Ÿ’ก ็‚บไป€้บผๆˆ‘ๅ€‘่ฆๅญธ้€™ๅ€‹๏ผŸ
- (ไธ€ๅ€‹ bullet point....)
## โ“ ๅปถไผธๅฐๅ•้กŒ
- (ไธ€ๅ€‹ bullet point....)
"""
messages = [
{"role": "system", "content": sys_content},
{"role": "user", "content": user_content}
]
request_payload = {
"model": "gpt-4-turbo",
"messages": messages,
"max_tokens": 4000,
}
response = OPEN_AI_CLIENT.chat.completions.create(**request_payload)
new_summary = response.choices[0].message.content.strip()
print("=====new_summary=====")
print(new_summary)
print("=====new_summary=====")
summary_json["summary"] = new_summary
summary_text = json.dumps(summary_json, ensure_ascii=False, indent=2)
# ๅฆๅญ˜็‚บ summary_markdown.json
new_file_name = f'{video_id}_summary_markdown.json'
new_blob_name = f"{video_id}/{new_file_name}"
GCS_SERVICE.upload_json_string(bucket_name, new_blob_name, summary_text)
return new_summary
# AI ็”Ÿๆˆๆ•™ๅญธ็ด ๆ
def get_meta_data(video_id, source="gcs"):
if source == "gcs":
print("===get_meta_data on gcs===")
gcs_client = GCS_CLIENT
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_meta_data.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
meta_data_json = {
"subject": "",
"grade": "",
}
print("meta_data empty return")
else:
# meta_dataๅทฒๅญ˜ๅœจ๏ผŒไธ‹่ฝฝๅ†…ๅฎน
print("meta_dataๅทฒๅญ˜ๅœจไบŽGCSไธญ")
meta_data_text = GCS_SERVICE.download_as_string(bucket_name, blob_name)
meta_data_json = json.loads(meta_data_text)
# meta_data_json grade ๆ•ธๅญ—่ฝ‰ๆ›ๆˆๆ–‡ๅญ—
grade = meta_data_json["grade"]
case = {
1: "ไธ€ๅนด็ดš",
2: "ไบŒๅนด็ดš",
3: "ไธ‰ๅนด็ดš",
4: "ๅ››ๅนด็ดš",
5: "ไบ”ๅนด็ดš",
6: "ๅ…ญๅนด็ดš",
7: "ไธƒๅนด็ดš",
8: "ๅ…ซๅนด็ดš",
9: "ไนๅนด็ดš",
10: "ๅๅนด็ดš",
11: "ๅไธ€ๅนด็ดš",
12: "ๅไบŒๅนด็ดš",
}
grade_text = case.get(grade, "")
meta_data_json["grade"] = grade_text
return meta_data_json
def get_ai_content(password, video_id, df_string, topic, grade, level, specific_feature, content_type, source="gcs"):
verify_password(password)
if source == "gcs":
print("===get_ai_content on gcs===")
bucket_name = 'video_ai_assistant'
file_name = f'{video_id}_ai_content_list.json'
blob_name = f"{video_id}/{file_name}"
# ๆฃ€ๆŸฅๆช”ๆกˆๆ˜ฏๅฆๅญ˜ๅœจ
is_file_exists = GCS_SERVICE.check_file_exists(bucket_name, blob_name)
if not is_file_exists:
# ๅ…ˆๅปบ็ซ‹ไธ€ๅ€‹ ai_content_list.json
ai_content_list = []
ai_content_text = json.dumps(ai_content_list, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text)
print("ai_content_list [] ๅทฒไธŠๅ‚ณๅˆฐGCS")
# ๆญคๆ™‚ ai_content_list ๅทฒๅญ˜ๅœจ
ai_content_list_string = GCS_SERVICE.download_as_string(bucket_name, blob_name)
ai_content_list = json.loads(ai_content_list_string)
# by key ๆ‰พๅˆฐ ai_content ๏ผˆtopic, grade, level, specific_feature, content_type๏ผ‰
target_kvs = {
"video_id": video_id,
"level": level,
"specific_feature": specific_feature,
"content_type": content_type
}
ai_content_json = [
item for item in ai_content_list
if all(item[k] == v for k, v in target_kvs.items())
]
if len(ai_content_json) == 0:
ai_content, prompt = generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type)
ai_content_json = {
"video_id": video_id,
"content": str(ai_content),
"prompt": prompt,
"level": level,
"specific_feature": specific_feature,
"content_type": content_type
}
ai_content_list.append(ai_content_json)
ai_content_text = json.dumps(ai_content_list, ensure_ascii=False, indent=2)
GCS_SERVICE.upload_json_string(bucket_name, blob_name, ai_content_text)
print("ai_contentๅทฒไธŠๅ‚ณๅˆฐGCS")
else:
ai_content_json = ai_content_json[-1]
ai_content = ai_content_json["content"]
prompt = ai_content_json["prompt"]
return ai_content, ai_content, prompt, prompt
def generate_ai_content(password, df_string, topic, grade, level, specific_feature, content_type):
verify_password(password)
material = EducationalMaterial(df_string, topic, grade, level, specific_feature, content_type)
prompt = material.generate_content_prompt()
try:
ai_content = material.get_ai_content(OPEN_AI_CLIENT, ai_type="openai")
except Exception as e:
error_msg = f" {video_id} OPEN AI ็”Ÿๆˆๆ•™ๅญธ็ด ๆ้Œฏ่ชค: {str(e)}"
print("===generate_ai_content error===")
print(error_msg)
print("===generate_ai_content error===")
ai_content = material.get_ai_content(BEDROCK_CLIENT, ai_type="bedrock")
return ai_content, prompt
def generate_exam_fine_tune_result(password, exam_result_prompt , df_string_output, exam_result, exam_result_fine_tune_prompt):
verify_password(password)
material = EducationalMaterial(df_string_output, "", "", "", "", "")
try:
fine_tuned_ai_content = material.get_fine_tuned_ai_content(OPEN_AI_CLIENT, "openai", exam_result_prompt, exam_result, exam_result_fine_tune_prompt)
except:
fine_tuned_ai_content = material.get_fine_tuned_ai_content(BEDROCK_CLIENT, "bedrock", exam_result_prompt, exam_result, exam_result_fine_tune_prompt)
return fine_tuned_ai_content
def return_original_exam_result(exam_result_original):
return exam_result_original
def create_word(content):
unique_filename = str(uuid.uuid4())
word_file_path = f"/tmp/{unique_filename}.docx"
doc = Document()
doc.add_paragraph(content)
doc.save(word_file_path)
return word_file_path
def download_exam_result(content):
word_path = create_word(content)
return word_path
# ---- Chatbot ----
def get_instructions(content_subject, content_grade, key_moments, socratic_mode=True):
if socratic_mode:
method = "Socratic style, guide thinking, no direct answers. this is very important, please be seriously following."
else:
method = "direct answers, but encourage user to think more."
instructions = f"""
subject: {content_subject}
grade: {content_grade}
context: {key_moments}
Assistant Role: you are a {content_subject} assistant. you can call yourself as {content_subject} ๅญธไผด
User Role: {content_grade} th-grade student.
Method: {method}
Language: Traditional Chinese ZH-TW (it's very important), suitable for {content_grade} th-grade level.
Response:
- if user say hi or hello or any greeting, just say hi back and introduce yourself. Then tell user to ask question in context.
- Single question, under 100 characters
- include math symbols (use LaTeX $ to cover before and after, ex: $x^2$)
- hint with video timestamp which format ใ€ๅƒ่€ƒ๏ผš00:00:00ใ€‘.
- Sometimes encourage user by Taiwanese style with relaxing atmosphere.
- if user ask questions not include in context,
- just tell them to ask the question in context and give them example question.
Restrictions: Answer within video content, no external references
"""
return instructions
def chat_with_any_ai(ai_type, password, video_id, user_data, transcript_state, key_moments, user_message, chat_history, content_subject, content_grade, questions_answers_json, socratic_mode=False, thread_id=None, ai_name=None):
print(f"ai_type: {ai_type}")
print(f"user_data: {user_data}")
verify_password(password)
verify_message_length(user_message, max_length=1500)
is_questions_answers_exists, question_message, answer_message = check_questions_answers(user_message, questions_answers_json)
if is_questions_answers_exists:
chat_history = update_chat_history(question_message, answer_message, chat_history)
send_btn_update, send_feedback_btn_update = update_send_and_feedback_buttons(chat_history, CHAT_LIMIT)
time.sleep(3)
return "", chat_history, send_btn_update, send_feedback_btn_update, thread_id
verify_chat_limit(chat_history, CHAT_LIMIT)
if ai_type == "chat_completions":
chatbot_config = get_chatbot_config(ai_name, transcript_state, key_moments, content_subject, content_grade, video_id, socratic_mode)
chatbot = Chatbot(chatbot_config)
response_text = chatbot.chat(user_message, chat_history)
thread_id = ""
elif ai_type == "assistant":
client = OPEN_AI_CLIENT
assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 #GPT 4 turbo
if isinstance(key_moments, str):
key_moments_json = json.loads(key_moments)
else:
key_moments_json = key_moments
# key_moments_json remove images
for moment in key_moments_json:
moment.pop('images', None)
moment.pop('end', None)
moment.pop('transcript', None)
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False)
instructions = get_instructions(content_subject, content_grade, key_moments_text, socratic_mode)
print(f"=== instructions:{instructions} ===")
metadata={
"video_id": video_id,
"user_data": user_data,
"content_subject": content_subject,
"content_grade": content_grade,
"socratic_mode": str(socratic_mode),
"assistant_id": assistant_id,
"is_streaming": "false",
}
user_message_note = "/n ่ซ‹ๅšดๆ ผ้ตๅพชinstructions๏ผŒๆ“”ไปปไธ€ไฝ่˜‡ๆ ผๆ‹‰ๅบ•ๅฎถๆ•™๏ผŒ็ต•ๅฐไธ่ฆ้‡่ค‡ user ็š„ๅ•ๅฅ๏ผŒ่ซ‹็”จๅผ•ๅฐŽ็š„ๆ–นๅผๆŒ‡ๅผ•ๆ–นๅ‘๏ผŒ่ซ‹ไธ€ๅฎš่ฆ็”จ็น้ซ”ไธญๆ–‡ๅ›ž็ญ” zh-TW๏ผŒไธฆ็”จๅฐ็ฃไบบ็š„็ฆฎ่ฒŒๅฃ่ชž่กจ้”๏ผŒๅ›ž็ญ”ๆ™‚ไธ่ฆ็‰นๅˆฅ่ชชๆ˜Ž้€™ๆ˜ฏๅฐ็ฃไบบ็š„่ชžๆฐฃ๏ผŒ่ซ‹ๅœจๅ›ž็ญ”็š„ๆœ€ๅพŒๆจ™่จปใ€ๅƒ่€ƒ๏ผš๏ผˆๆ™‚๏ผ‰:๏ผˆๅˆ†๏ผ‰:๏ผˆ็ง’๏ผ‰ใ€‘๏ผŒ๏ผˆๅฆ‚ๆžœๆ˜ฏๅๅ•ๅญธ็”Ÿ๏ผŒๅฐฑๅชๅ•ไธ€ๅ€‹ๅ•้กŒ๏ผŒ่ซ‹ๅนซๅŠฉๅญธ็”Ÿๆ›ดๅฅฝ็š„็†่งฃ่ณ‡ๆ–™๏ผŒๅญ—ๆ•ธๅœจ100ๅญ—ไปฅๅ…ง๏ผŒๅ›ž็ญ”ๆ™‚ๅฆ‚ๆžœ่ฌ›ๅˆฐๆ•ธๅญธๅฐˆๆœ‰ๅ่ฉž๏ผŒ่ซ‹็”จๆ•ธๅญธ็ฌฆ่™Ÿไปฃๆ›ฟๆ–‡ๅญ—๏ผˆLatex ็”จ $ ๅญ—่™Ÿ render, ex: $x^2$)"
user_content = user_message + user_message_note
response_text, thread_id = handle_conversation_by_open_ai_assistant(client, user_content, instructions, assistant_id, thread_id, metadata, fallback=True)
# ๆ›ดๆ–ฐ่ŠๅคฉๅŽ†ๅฒ
chat_history = update_chat_history(user_message, response_text, chat_history)
send_btn_update, send_feedback_btn_update = update_send_and_feedback_buttons(chat_history, CHAT_LIMIT)
# ่ฟ”ๅ›ž่ŠๅคฉๅŽ†ๅฒๅ’Œ็ฉบๅญ—็ฌฆไธฒๆธ…็ฉบ่พ“ๅ…ฅๆก†
return "", chat_history, send_btn_update, send_feedback_btn_update, thread_id
def get_chatbot_config(ai_name, transcript_state, key_moments, content_subject, content_grade, video_id, socratic_mode=True):
if not ai_name in ["foxcat", "lili", "maimai"]:
ai_name = "foxcat"
ai_name_clients_model = {
"foxcat": {
"ai_name": "foxcat",
"ai_client": GROQ_CLIENT,
"ai_model_name": "groq_llama3",
},
"lili": {
"ai_name": "lili",
"ai_client": BEDROCK_CLIENT,
"ai_model_name": "claude3",
},
"maimai": {
"ai_name": "maimai",
"ai_client": GROQ_CLIENT,
"ai_model_name": "groq_mixtral",
}
}
ai_client = ai_name_clients_model.get(ai_name, "foxcat")["ai_client"]
ai_model_name = ai_name_clients_model.get(ai_name, "foxcat")["ai_model_name"]
if isinstance(transcript_state, str):
simple_transcript = json.loads(transcript_state)
else:
simple_transcript = transcript_state
if isinstance(key_moments, str):
key_moments_json = json.loads(key_moments)
else:
key_moments_json = key_moments
# key_moments_json remove images
for moment in key_moments_json:
moment.pop('images', None)
moment.pop('end', None)
moment.pop('transcript', None)
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False)
instructions = get_instructions(content_subject, content_grade, key_moments_text, socratic_mode)
chatbot_config = {
"video_id": video_id,
"transcript": simple_transcript,
"key_moments": key_moments,
"content_subject": content_subject,
"content_grade": content_grade,
"jutor_chat_key": JUTOR_CHAT_KEY,
"ai_model_name": ai_model_name,
"ai_client": ai_client,
"instructions": instructions
}
return chatbot_config
def feedback_with_ai(ai_type, chat_history, thread_id=None):
# prompt: ่ซ‹ไพๆ“šไปฅไธŠ็š„ๅฐ่ฉฑ(chat_history)๏ผŒ็ธฝ็ตๆˆ‘็š„ใ€Œๆๅ•ๅŠ›ใ€๏ผŒไธฆ็ตฆไบˆๆˆ‘ๆ˜ฏๅฆๆœ‰ใ€Œๅ•ๅฐๅ•้กŒใ€็š„ๅ›ž้ฅ‹ๅ’Œๅปบ่ญฐ
system_content = """
ไฝ ๆ˜ฏไธ€ๅ€‹ๆ“…้•ทๅผ•ๅฐŽๅ•็ญ”็ด ้คŠ็š„่€ๅธซ๏ผŒuser ็‚บๅญธ็”Ÿ็š„ๆๅ•่ทŸๅ›ž็ญ”๏ผŒ่ซ‹็ฒพ่ฎ€ๅฐ่ฉฑ้Ž็จ‹๏ผŒ้‡ๅฐ user ็ตฆไบˆๅ›ž้ฅ‹ๅฐฑๅฅฝ๏ผŒๆ นๆ“šไปฅไธ‹ Rule:
- ่ซ‹ไฝฟ็”จ็น้ซ”ไธญๆ–‡ zh-TW ็ธฝ็ต user ็š„ๆๅ•ๅŠ›๏ผŒไธฆ็ตฆไบˆๆ˜ฏๅฆๆœ‰ๅ•ๅฐๅ•้กŒ็š„ๅ›ž้ฅ‹ๅ’Œๅปบ่ญฐ
- ไธๆŽก่จˆใ€้ ่จญๆๅ•ใ€‘็š„ๅ•้กŒ๏ผŒๅฆ‚ๆžœ user ็š„ๆๅ•้ƒฝไพ†่‡ชใ€้ ่จญๆๅ•ใ€‘๏ผŒ่กจ้”็”จๆˆถๅ–„ๆ–ผไฝฟ็”จ็ณป็ตฑ๏ผŒ่ซ‹็ตฆไบˆๅ›ž้ฅ‹ไธฆ้ผ“ๅ‹ต user ่ฆช่‡ชๆๅ•ๆ›ดๅ…ท้ซ”็š„ๅ•้กŒ
- ๅฆ‚ๆžœ็”จๆˆถๆๅ•้ƒฝ็›ธ็•ถ็ฐก็Ÿญ๏ผŒ็”š่‡ณๅฐฑๆ˜ฏไธ€ๅ€‹ๅญ—ๆˆ–้ƒฝๆ˜ฏไธ€ๅ€‹ๆ•ธๅญ—๏ผˆๅƒๆ˜ฏ user: 1, user:2๏ผ‰๏ผŒ่ซ‹็ตฆไบˆๅ›ž้ฅ‹ไธฆๅปบ่ญฐ user ๆๅ•ๆ›ดๅ…ท้ซ”็š„ๅ•้กŒ
- ๅฆ‚ๆžœ็”จๆˆถๆๅ•ๅ…งๅฎนๅชๆœ‰็ฌฆ่™Ÿๆˆ–ๆ˜ฏไบ‚็ขผ๏ผŒๅƒๆ˜ฏ๏ผŸ,๏ผ, ..., 3bhwbqhfw2vve2 ็ญ‰๏ผŒ่ซ‹็ตฆไบˆๅ›ž้ฅ‹ไธฆๅปบ่ญฐ user ๆๅ•ๆ›ดๅ…ท้ซ”็š„ๅ•้กŒ
- ๅฆ‚ๆžœ็”จๆˆถๆๅ•ๅ…งๅฎนๆœ‰่‰ฒๆƒ…ใ€ๆšดๅŠ›ใ€ไป‡ๆจใ€ไธ็•ถ่จ€่ซ–็ญ‰๏ผŒ่ซ‹็ตฆไบˆๅšดๅŽฒ็š„ๅ›ž้ฅ‹ไธฆๅปบ่ญฐ user ๆๅ•ๆ›ดๅ…ท้ซ”็š„ๅ•้กŒ
- ไธฆ็”จ็ฌฌไบŒไบบ็จฑใ€Œไฝ ใ€ไพ†ไปฃ่กจ user
- ่ซ‹็ฆฎ่ฒŒ๏ผŒไธฆ็ตฆไบˆ้ผ“ๅ‹ต
"""
chat_history_conversation = ""
# ๆจ™่จป user and assistant as string
# chat_history ็ฌฌไธ€็ต„ไธๆŽก่จˆ
for chat in chat_history[1:]:
user_message = chat[0]
assistant_message = chat[1]
chat_history_conversation += f"User: {user_message}\nAssistant: {assistant_message}\n"
feedback_request_message = "่ซ‹ไพๆ“šไปฅไธŠ็š„ๅฐ่ฉฑ๏ผŒ็ธฝ็ตๆˆ‘็š„ใ€Œๆๅ•ๅŠ›ใ€๏ผŒไธฆ็ตฆไบˆๆˆ‘ๆ˜ฏๅฆๆœ‰ใ€Œๅ•ๅฐๅ•้กŒใ€็š„ๅ›ž้ฅ‹ๅ’Œๅปบ่ญฐ"
user_content = f"""conversation: {chat_history_conversation}
{feedback_request_message}
ๆœ€ๅพŒๆ นๆ“šๆๅ•ๅŠ›่กจ็พ๏ผŒ็ตฆไบˆๆๅ•ๅปบ่ญฐใ€ๆๅ•่กจ็พ๏ผŒไธฆ็”จ emoji ไพ†่กจ็คบ่ฉ•ๅˆ†๏ผš
๐ŸŸข๏ผš๏ผˆ่กจ็พๅพˆๅฅฝ็š„ๅ›ž้ฅ‹๏ผŒ็ตฆไบˆๆญฃๅ‘่‚ฏๅฎš๏ผ‰
๐ŸŸก๏ผš๏ผˆ้‚„ๅฏไปฅๅŠ ๆฒน็š„็š„ๅ›ž้ฅ‹๏ผŒ็ตฆไบˆๆ˜Ž็ขบ็š„ๅปบ่ญฐ๏ผ‰
๐Ÿ”ด๏ผš๏ผˆ้žๅธธไธๆ‡‚ๆๅ•็š„ๅ›ž้ฅ‹๏ผŒ็ตฆไบˆ้ผ“ๅ‹ตไธฆ็ตฆๅ‡บๆ˜Ž็ขบ็คบ็ฏ„๏ผ‰
example:
ๅฆไธ€ๆ–น้ข๏ผŒไฝ ่กจ้”ใ€Œๆˆ‘ไธๆƒณๅญธไบ†ใ€้€™ๅ€‹ๆƒ…ๆ„Ÿ๏ผŒๅ…ถๅฏฆไนŸๆ˜ฏไธ€็จฎ้‡่ฆ็š„ๅ้ฅ‹ใ€‚้€™้กฏ็คบไฝ ๅฏ่ƒฝๆ„ŸๅˆฐๆŒซๆŠ˜ๆˆ–็–ฒๅ€ฆใ€‚ๅœจ้€™็จฎๆƒ…ๆณไธ‹๏ผŒ่กจ้”ๅ‡บไฝ ็š„ๆ„Ÿๅ—ๆ˜ฏๅฅฝ็š„๏ผŒไฝ†ๅฆ‚ๆžœ่ƒฝๅ…ท้ซ”่ชชๆ˜Žๆ˜ฏไป€้บผ่ฎ“ไฝ ๆ„Ÿๅˆฐ้€™ๆจฃ๏ผŒๆˆ–ๆ˜ฏๆœ‰ไป€้บผๅ…ท้ซ”็š„ๅญธ็ฟ’้šœ็ค™๏ผŒๆœƒๆ›ดๆœ‰ๅŠฉๆ–ผๆ‰พๅˆฐ่งฃๆฑบๆ–นๆกˆใ€‚
็ตฆไบˆไฝ ็š„ๅปบ่ญฐๆ˜ฏ๏ผŒๅ˜—่ฉฆๅœจๆๅ•ๆ™‚ๆ›ดๆ˜Ž็ขบไธ€ไบ›๏ผŒ้€™ๆจฃไธๅƒ…่ƒฝๅนซๅŠฉไฝ ็ฒๅพ—ๆ›ดๅฅฝ็š„ๅญธ็ฟ’ๆ”ฏๆŒ๏ผŒไนŸ่ƒฝๆ้ซ˜ไฝ ็š„ๅ•้กŒ่งฃๆฑบๆŠ€ๅทงใ€‚
......
ๆๅ•ๅปบ่ญฐ๏ผšๅœจๆๅ•ๆ™‚๏ผŒ่ฉฆ่‘—ๅ…ท้ซ”ไธฆๆธ…ๆ™ฐๅœฐ่กจ้”ไฝ ็š„้œ€ๆฑ‚ๅ’Œ็–‘ๆƒ‘๏ผŒ้€™ๆจฃ่ƒฝๆ›ดๆœ‰ๆ•ˆๅœฐๅพ—ๅˆฐๅนซๅŠฉใ€‚
ๆๅ•่กจ็พ๏ผšใ€๐ŸŸกใ€‘ๅŠ ๆฒน๏ผŒๆŒ็บŒ็ทด็ฟ’๏ผŒไฝ ็š„ๆๅ•ๅŠ›ๆœƒ่ถŠไพ†่ถŠๅฅฝ๏ผ
"""
client = OPEN_AI_CLIENT
if ai_type == "chat_completions":
model_name = "gpt-4-turbo"
response_text = handle_conversation_by_open_ai_chat_completions(client, model_name, user_content, system_content)
elif ai_type == "assistant":
assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 #GPT 4 turbo
# assistant_id = OPEN_AI_ASSISTANT_ID_GPT3 #GPT 3.5 turbo
response_text, thread_id = handle_conversation_by_open_ai_assistant(client, user_content, system_content, assistant_id, thread_id, metadata=None, fallback=True)
chat_history = update_chat_history(feedback_request_message, response_text, chat_history)
feedback_btn_update = gr.update(value="ๅทฒๅ›ž้ฅ‹", interactive=False, variant="secondary")
return chat_history, feedback_btn_update
def handle_conversation_by_open_ai_chat_completions(client, model_name, user_content, system_content):
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": user_content}
],
max_tokens=4000,
)
response_text = response.choices[0].message.content.strip()
return response_text
def handle_conversation_by_open_ai_assistant(client, user_message, instructions, assistant_id, thread_id=None, metadata=None, fallback=False):
"""
Handles the creation and management of a conversation thread.
:param client: The OpenAI client object.
:param thread_id: The existing thread ID, if any.
:param user_message: The message from the user.
:param instructions: System instructions for the assistant.
:param assistant_id: ID of the assistant to use.
:param metadata: Additional metadata to add to the thread.
:param fallback: Whether to use a fallback method in case of failure.
:return: A string with the response text or an error message.
"""
try:
if not thread_id:
thread = client.beta.threads.create()
thread_id = thread.id
else:
thread = client.beta.threads.retrieve(thread_id)
if metadata:
client.beta.threads.update(thread_id=thread.id, metadata=metadata)
# Send the user message to the thread
client.beta.threads.messages.create(thread_id=thread.id, role="user", content=user_message)
# Run the assistant
run = client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant_id, instructions=instructions)
# Wait for the response
run_status = poll_run_status(run.id, thread.id, timeout=30)
if run_status == "completed":
messages = client.beta.threads.messages.list(thread_id=thread.id)
response_text = messages.data[0].content[0].text.value
else:
response_text = "ๅญธ็ฟ’็ฒพ้ˆๆœ‰้ปž็ดฏ๏ผŒ่ซ‹็จๅพŒๅ†่ฉฆ๏ผ"
except Exception as e:
if fallback:
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": instructions},
{"role": "user", "content": user_message}
],
max_tokens=4000,
)
response_text = response.choices[0].message.content.strip()
else:
print(f"Error: {e}")
raise gr.Error(f"Error: {e}")
return response_text, thread_id
def verify_message_length(user_message, max_length=500):
# ้ฉ—่ญ‰็”จๆˆถๆถˆๆฏ็š„้•ทๅบฆ
if len(user_message) > max_length:
error_msg = "ไฝ ็š„่จŠๆฏๅคช้•ทไบ†๏ผŒ่ซ‹็ธฎ็Ÿญ่จŠๆฏ้•ทๅบฆ่‡ณไบ”็™พๅญ—ไปฅๅ…ง"
raise gr.Error(error_msg)
def check_questions_answers(user_message, questions_answers_json):
"""ๆชขๆŸฅๅ•็ญ”ๆ˜ฏๅฆๅญ˜ๅœจ๏ผŒไธฆ่™•็†็›ธ้—œ้‚่ผฏ"""
is_questions_answers_exists = False
answer = ""
# ่งฃๆžๅ•็ญ”ๆ•ธๆ“š
if isinstance(questions_answers_json, str):
qa_data = json.loads(questions_answers_json)
else:
qa_data = questions_answers_json
question_message = ""
answer_message = ""
for qa in qa_data:
if user_message == qa["question"] and qa["answer"]:
is_questions_answers_exists = True
question_message = f"ใ€้ ่จญๅ•้กŒใ€‘{user_message}"
answer_message = qa["answer"]
print("=== in questions_answers_json==")
print(f"question: {qa['question']}")
print(f"answer: {answer_message}")
break # ๅŒน้…ๅˆฐ็ญ”ๆกˆๅพŒ้€€ๅ‡บๅพช็’ฐ
return is_questions_answers_exists, question_message, answer_message
def verify_chat_limit(chat_history, chat_limit):
if chat_history is not None and len(chat_history) > chat_limit:
error_msg = "ๆญคๆฌกๅฐ่ฉฑ่ถ…้ŽไธŠ้™๏ผˆๅฐ่ฉฑไธ€่ผช10ๆฌก๏ผ‰"
raise gr.Error(error_msg)
def update_chat_history(user_message, response, chat_history):
# ๆ›ดๆ–ฐ่Šๅคฉๆญทๅฒ็š„้‚่ผฏ
new_chat_history = (user_message, response)
if chat_history is None:
chat_history = [new_chat_history]
else:
chat_history.append(new_chat_history)
return chat_history
def update_send_and_feedback_buttons(chat_history, chat_limit):
# ่ฎก็ฎ—ๅ‘้€ๆฌกๆ•ฐ
send_count = len(chat_history) - 1
# ๆ นๆฎ่ŠๅคฉๅŽ†ๅฒ้•ฟๅบฆๆ›ดๆ–ฐๅ‘้€ๆŒ‰้’ฎๅ’Œๅ้ฆˆๆŒ‰้’ฎ
if len(chat_history) > chat_limit:
send_btn_value = f"ๅฐ่ฉฑไธŠ้™ ({send_count}/{chat_limit})"
send_btn_update = gr.update(value=send_btn_value, interactive=False)
send_feedback_btn_update = gr.update(visible=True)
else:
send_btn_value = f"็™ผ้€ ({send_count}/{chat_limit})"
send_btn_update = gr.update(value=send_btn_value, interactive=True)
send_feedback_btn_update = gr.update(visible=False)
return send_btn_update, send_feedback_btn_update
def process_open_ai_audio_to_chatbot(password, audio_url):
verify_password(password)
if audio_url:
with open(audio_url, "rb") as audio_file:
file_size = os.path.getsize(audio_url)
if file_size > 2000000:
raise gr.Error("ๆช”ๆกˆๅคงๅฐ่ถ…้Ž๏ผŒ่ซ‹ไธ่ฆ่ถ…้Ž 60็ง’")
else:
transcription = OPEN_AI_CLIENT.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
# response ๆ‹†่งฃ dict
print("=== transcription ===")
print(transcription)
print("=== transcription ===")
# ็ขบ่ช response ๆ˜ฏๅฆๆœ‰ๆ•ธๅญธ็ฌฆ่™Ÿ๏ผŒprompt to LATEX $... $, ex: $x^2$
if transcription:
system_message = """ไฝ ๆ˜ฏๅฐˆๆฅญ็š„ LATEX ่ฝ‰ๆ›ๅธซ๏ผŒๆ“…้•ทๅฐ‡ๆ•ธๅญธ็ฌฆ่™Ÿใ€ๅ…ฌๅผ่ฝ‰ๆ›ๆˆ LATEX ๆ ผๅผ๏ผŒไธฆ็”จ LATEX ็ฌฆ่™Ÿ $...$ ๅŒ…่ฃน๏ผŒex: $x^2$
็ฏ„ไพ‹๏ผš
transcription: x็š„ๅนณๆ–นๅŠ  2x ๅŠ  1 ็ญ‰ๆ–ผ 0
่ฝ‰ๆˆ LATEX ๆ ผๅผ๏ผš$x^2 + 2x + 1 = 0$
"""
user_message = f"""transcription: {transcription}
่ซ‹ๅฐ‡ transcription ๅ…ง็š„ๆ•ธๅญธใ€ๅ…ฌๅผใ€้‹็ฎ—ๅผใ€ๅŒ–ๅญธๅผใ€็‰ฉ็† formula ๅ…งๅฎน่ฝ‰ๆ›ๆˆ LATEX ๆ ผๅผ
ๅ…ถไป–ๆ–‡ๅญ—้ƒฝไฟ็•™ๅŽŸๆจฃ
ไนŸไธ่ฆ็ตฆๅ‡บๅคš้ค˜็š„ๆ•˜่ฟฐ
"""
request = OPEN_AI_CLIENT.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
],
max_tokens=4000,
)
response = request.choices[0].message.content.strip()
else:
response = ""
return response
def poll_run_status(run_id, thread_id, timeout=600, poll_interval=5):
"""
Polls the status of a Run and handles different statuses appropriately.
:param run_id: The ID of the Run to poll.
:param thread_id: The ID of the Thread associated with the Run.
:param timeout: Maximum time to wait for the Run to complete, in seconds.
:param poll_interval: Time to wait between each poll, in seconds.
"""
client = OPEN_AI_CLIENT
start_time = time.time()
while time.time() - start_time < timeout:
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status in ["completed", "cancelled", "failed"]:
print(f"Run completed with status: {run.status}")
break
elif run.status == "requires_action":
print("Run requires action. Performing required action...")
# Here, you would perform the required action, e.g., running functions
# and then submitting the outputs. This is simplified for this example.
# After performing the required action, you'd complete the action:
# OPEN_AI_CLIENT.beta.threads.runs.complete_required_action(...)
elif run.status == "expired":
print("Run expired. Exiting...")
break
else:
print(f"Run status is {run.status}. Waiting for updates...")
time.sleep(poll_interval)
else:
print("Timeout reached. Run did not complete in the expected time.")
# Once the Run is completed, handle the result accordingly
if run.status == "completed":
# Retrieve and handle messages or run steps as needed
messages = client.beta.threads.messages.list(thread_id=thread_id)
for message in messages.data:
if message.role == "assistant":
print(f"Assistant response: {message.content}")
elif run.status in ["cancelled", "failed"]:
# Handle cancellation or failure
print(f"Run ended with status: {run.status}")
elif run.status == "expired":
# Handle expired run
print("Run expired without completion.")
return run.status
def chat_with_opan_ai_assistant_streaming(user_message, chat_history, password, video_id, user_data, thread_id, trascript, key_moments, content_subject, content_grade, socratic_mode=True):
verify_password(password)
print("=====user_data=====")
print(f"user_data: {user_data}")
print("===chat_with_opan_ai_assistant_streaming===")
print(thread_id)
# ๅ…ˆ่จˆ็ฎ— user_message ๆ˜ฏๅฆ่ถ…้Ž 500 ๅ€‹ๅญ—
if len(user_message) > 1500:
error_msg = "ไฝ ็š„่จŠๆฏๅคช้•ทไบ†๏ผŒ่ซ‹็ธฎ็Ÿญ่จŠๆฏ้•ทๅบฆ่‡ณไบ”็™พๅญ—ไปฅๅ…ง"
raise gr.Error(error_msg)
# ๅฆ‚ๆžœ chat_history ่ถ…้Ž 10 ๅ‰‡่จŠๆฏ๏ผŒ็›ดๆŽฅ return "ๅฐ่ฉฑ่ถ…้ŽไธŠ้™"
if chat_history is not None and len(chat_history) > CHAT_LIMIT:
error_msg = f"ๆญคๆฌกๅฐ่ฉฑ่ถ…้ŽไธŠ้™๏ผˆๅฐ่ฉฑไธ€่ผช{CHAT_LIMIT}ๆฌก๏ผ‰"
raise gr.Error(error_msg)
try:
assistant_id = OPEN_AI_ASSISTANT_ID_GPT4 #GPT 4 turbo
# assistant_id = OPEN_AI_ASSISTANT_ID_GPT3 #GPT 3.5 turbo
client = OPEN_AI_CLIENT
# ็›ดๆŽฅๅฎ‰ๆŽ’้€ๅญ—็จฟ่ณ‡ๆ–™ in instructions
# if isinstance(trascript, str):
# trascript_json = json.loads(trascript)
# else:
# trascript_json = trascript
# trascript_text = json.dumps(trascript_json, ensure_ascii=False)
# # trascript_text ็งป้™ค \n, ็ฉบ็™ฝ
# trascript_text = trascript_text.replace("\n", "").replace(" ", "")
if isinstance(key_moments, str):
key_moments_json = json.loads(key_moments)
else:
key_moments_json = key_moments
# key_moments_json remove images
for moment in key_moments_json:
moment.pop('images', None)
moment.pop('end', None)
moment.pop('transcript', None)
key_moments_text = json.dumps(key_moments_json, ensure_ascii=False)
instructions = get_instructions(content_subject, content_grade, key_moments_text, socratic_mode)
# ๅˆ›ๅปบ็บฟ็จ‹
if not thread_id:
thread = client.beta.threads.create()
thread_id = thread.id
print(f"new thread_id: {thread_id}")
else:
thread = client.beta.threads.retrieve(thread_id)
print(f"old thread_id: {thread_id}")
client.beta.threads.update(
thread_id=thread_id,
metadata={
"youtube_id": video_id,
"user_data": user_data,
"content_subject": content_subject,
"content_grade": content_grade,
"assistant_id": assistant_id,
"is_streaming": "true",
}
)
# ๅ‘็บฟ็จ‹ๆทปๅŠ ็”จๆˆท็š„ๆถˆๆฏ
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_message + "/n ่ซ‹ๅšดๆ ผ้ตๅพชinstructions๏ผŒๆ“”ไปปไธ€ไฝ่˜‡ๆ ผๆ‹‰ๅบ•ๅฎถๆ•™๏ผŒ่ซ‹ไธ€ๅฎš่ฆ็”จ็น้ซ”ไธญๆ–‡ๅ›ž็ญ” zh-TW๏ผŒไธฆ็”จๅฐ็ฃไบบ็š„็ฆฎ่ฒŒๅฃ่ชž่กจ้”๏ผŒๅ›ž็ญ”ๆ™‚ไธ่ฆ็‰นๅˆฅ่ชชๆ˜Ž้€™ๆ˜ฏๅฐ็ฃไบบ็š„่ชžๆฐฃ๏ผŒไธ็”จๆๅˆฐใ€Œ้€ๅญ—็จฟใ€้€™ๅ€‹่ฉž๏ผŒ็”จใ€Œๅ…งๅฎนใ€ไปฃๆ›ฟ))๏ผŒ่ซ‹ๅœจๅ›ž็ญ”็š„ๆœ€ๅพŒๆจ™่จปใ€ๅƒ่€ƒ่ณ‡ๆ–™๏ผš๏ผˆๆ™‚๏ผ‰:๏ผˆๅˆ†๏ผ‰:๏ผˆ็ง’๏ผ‰ใ€‘๏ผŒ๏ผˆๅฆ‚ๆžœๆ˜ฏๅๅ•ๅญธ็”Ÿ๏ผŒๅฐฑๅชๅ•ไธ€ๅ€‹ๅ•้กŒ๏ผŒ่ซ‹ๅนซๅŠฉๅญธ็”Ÿๆ›ดๅฅฝ็š„็†่งฃ่ณ‡ๆ–™๏ผŒๅญ—ๆ•ธๅœจ100ๅญ—ไปฅๅ…ง๏ผ‰"
)
with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=assistant_id,
instructions=instructions,
) as stream:
partial_messages = ""
for event in stream:
if event.data and event.data.object == "thread.message.delta":
message = event.data.delta.content[0].text.value
partial_messages += message
yield partial_messages
except Exception as e:
print(f"Error: {e}")
raise gr.Error(f"Error: {e}")
def create_thread_id():
thread = OPEN_AI_CLIENT.beta.threads.create()
thread_id = thread.id
print(f"create new thread_id: {thread_id}")
return thread_id
def chatbot_select(chatbot_name):
chatbot_select_accordion_visible = gr.update(visible=False)
all_chatbot_select_btn_visible = gr.update(visible=True)
chatbot_open_ai_streaming_visible = gr.update(visible=False)
chatbot_ai_visible = gr.update(visible=False)
ai_name_update = gr.update(value="foxcat")
ai_chatbot_thread_id_update = gr.update(value="")
if chatbot_name == "chatbot_open_ai":
chatbot_ai_visible = gr.update(visible=True)
ai_chatbot_ai_type_update = gr.update(value="assistant")
elif chatbot_name == "chatbot_open_ai_streaming":
chatbot_open_ai_streaming_visible = gr.update(visible=True)
ai_chatbot_ai_type_update = gr.update(value="assistant_streaming")
else:
chatbot_ai_visible = gr.update(visible=True)
ai_chatbot_ai_type_update = gr.update(value="chat_completions")
ai_name_update = gr.update(value=chatbot_name)
return chatbot_select_accordion_visible, all_chatbot_select_btn_visible, \
chatbot_open_ai_streaming_visible, chatbot_ai_visible, \
ai_name_update, ai_chatbot_ai_type_update, ai_chatbot_thread_id_update
def update_avatar_images(avatar_images, chatbot_description_value):
value = [[
"่ซ‹ๅ•ไฝ ๆ˜ฏ่ชฐ๏ผŸ",
chatbot_description_value
]]
ai_chatbot_update = gr.update(avatar_images=avatar_images, value=value)
return ai_chatbot_update
def show_all_chatbot_accordion():
chatbot_select_accordion_visible = gr.update(visible=True)
all_chatbot_select_btn_visible = gr.update(visible=False)
return chatbot_select_accordion_visible, all_chatbot_select_btn_visible
# --- Slide mode ---
def update_slide(direction):
global TRANSCRIPTS
global CURRENT_INDEX
print("=== ๆ›ดๆ–ฐๆŠ•ๅฝฑ็‰‡ ===")
print(f"CURRENT_INDEX: {CURRENT_INDEX}")
# print(f"TRANSCRIPTS: {TRANSCRIPTS}")
CURRENT_INDEX += direction
if CURRENT_INDEX < 0:
CURRENT_INDEX = 0 # ้˜ฒๆญข็ดขๅผ•ๅฐไบŽ0
elif CURRENT_INDEX >= len(TRANSCRIPTS):
CURRENT_INDEX = len(TRANSCRIPTS) - 1 # ้˜ฒๆญข็ดขๅผ•่ถ…ๅ‡บ่Œƒๅ›ด
# ่Žทๅ–ๅฝ“ๅ‰ๆก็›ฎ็š„ๆ–‡ๆœฌๅ’Œๆˆชๅ›พ URL
current_transcript = TRANSCRIPTS[CURRENT_INDEX]
slide_image = current_transcript["screenshot_path"]
slide_text = current_transcript["text"]
return slide_image, slide_text
def prev_slide():
return update_slide(-1)
def next_slide():
return update_slide(1)
# --- Init params ---
def init_params(text, request: gr.Request):
if request:
print("Request headers dictionary:", request.headers)
print("IP address:", request.client.host)
print("Query parameters:", dict(request.query_params))
# url = request.url
print("Request URL:", request.url)
youtube_link = ""
password_text = ""
admin = gr.update(visible=True)
reading_passage_admin = gr.update(visible=True)
summary_admin = gr.update(visible=True)
see_detail = gr.update(visible=True)
worksheet_accordion = gr.update(visible=True)
lesson_plan_accordion = gr.update(visible=True)
exit_ticket_accordion = gr.update(visible=True)
chatbot_open_ai_streaming = gr.update(visible=False)
chatbot_ai = gr.update(visible=False)
ai_chatbot_params = gr.update(visible=True)
# if youtube_link in query_params
if "youtube_id" in request.query_params:
youtube_id = request.query_params["youtube_id"]
youtube_link = f"https://www.youtube.com/watch?v={youtube_id}"
print(f"youtube_link: {youtube_link}")
# check if origin is from junyiacademy
origin = request.headers.get("origin", "")
if "junyiacademy" in origin:
password_text = "6161"
admin = gr.update(visible=False)
reading_passage_admin = gr.update(visible=False)
summary_admin = gr.update(visible=False)
see_detail = gr.update(visible=False)
worksheet_accordion = gr.update(visible=False)
lesson_plan_accordion = gr.update(visible=False)
exit_ticket_accordion = gr.update(visible=False)
ai_chatbot_params = gr.update(visible=False)
return admin, reading_passage_admin, summary_admin, see_detail, \
worksheet_accordion, lesson_plan_accordion, exit_ticket_accordion, \
password_text, youtube_link, \
chatbot_open_ai_streaming, chatbot_ai, ai_chatbot_params
def update_state(content_subject, content_grade, trascript, key_moments, questions_answers):
# inputs=[content_subject, content_grade, df_string_output],
# outputs=[content_subject_state, content_grade_state, trascript_state]
content_subject_state = content_subject
content_grade_state = content_grade
trascript_json = json.loads(trascript)
formatted_simple_transcript = create_formatted_simple_transcript(trascript_json)
trascript_state = formatted_simple_transcript
key_moments_state = key_moments
streaming_chat_thread_id_state = ""
questions_answers_json = json.loads(questions_answers)
question_1 = questions_answers_json[0]["question"]
question_2 = questions_answers_json[1]["question"]
question_3 = questions_answers_json[2]["question"]
ai_chatbot_question_1 = question_1
ai_chatbot_question_2 = question_2
ai_chatbot_question_3 = question_3
return content_subject_state, content_grade_state, trascript_state, key_moments_state, \
streaming_chat_thread_id_state, \
ai_chatbot_question_1, ai_chatbot_question_2, ai_chatbot_question_3
HEAD = """
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
svg.markmap {{
width: 100%;
height: 100vh;
}}
</style>
<script src="https://cdn.jsdelivr.net/npm/markmap-autoloader@0.15.2"></script>
<script>
const mind_map_tab_button = document.querySelector("#mind_map_tab-button");
if (mind_map_tab_button) {
mind_map_tab_button.addEventListener('click', function() {
const mind_map_markdown = document.querySelector("#mind_map_markdown > label > textarea");
if (mind_map_markdown) {
// ๅฝ“ๆŒ‰้’ฎ่ขซ็‚นๅ‡ปๆ—ถ๏ผŒๆ‰“ๅฐๅฝ“ๅ‰็š„textarea็š„ๅ€ผ
console.log('Value changed to: ' + mind_map_markdown.value);
markmap.autoLoader.renderAll();
}
});
}
</script>
"""
with gr.Blocks(theme=gr.themes.Base(primary_hue=gr.themes.colors.orange, secondary_hue=gr.themes.colors.amber, text_size = gr.themes.sizes.text_lg), head=HEAD) as demo:
with gr.Row() as admin:
password = gr.Textbox(label="Password", type="password", elem_id="password_input", visible=True)
youtube_link = gr.Textbox(label="Enter YouTube Link", elem_id="youtube_link_input", visible=True)
video_id = gr.Textbox(label="video_id", visible=True)
# file_upload = gr.File(label="Upload your CSV or Word file", visible=False)
# web_link = gr.Textbox(label="Enter Web Page Link", visible=False)
user_data = gr.Textbox(label="User Data", elem_id="user_data_input", visible=True)
youtube_link_btn = gr.Button("Submit_YouTube_Link", elem_id="youtube_link_btn", visible=True)
with gr.Row() as data_state:
content_subject_state = gr.State() # ไฝฟ็”จ gr.State ๅญ˜ๅ‚จ content_subject
content_grade_state = gr.State() # ไฝฟ็”จ gr.State ๅญ˜ๅ‚จ content_grade
trascript_state = gr.State() # ไฝฟ็”จ gr.State ๅญ˜ๅ‚จ trascript
key_moments_state = gr.State() # ไฝฟ็”จ gr.State ๅญ˜ๅ‚จ key_moments
streaming_chat_thread_id_state = gr.State() # ไฝฟ็”จ gr.State ๅญ˜ๅ‚จ streaming_chat_thread_id
with gr.Tab("AIๅฐ็ฒพ้ˆ"):
with gr.Row():
all_chatbot_select_btn = gr.Button("้ธๆ“‡ AI ๅฐ็ฒพ้ˆ ๐Ÿ‘ˆ", elem_id="all_chatbot_select_btn", visible=False, variant="secondary", size="sm")
with gr.Row() as ai_chatbot_params:
ai_name = gr.Dropdown(
label="้ธๆ“‡ AI ๅŠฉ็†",
choices=[
("้ฃ›็‰น็ฒพ้ˆ","chatbot_open_ai"),
("้ฃ›็‰น้Ÿณ้€Ÿ","chatbot_open_ai_streaming"),
("ๆขจๆขจ","lili"),
("้บฅ้บฅ","maimai"),
("็‹็‹ธ่ฒ“","foxcat")
],
value="foxcat",
visible=True
)
ai_chatbot_ai_type = gr.Textbox(value="chat_completions", visible=True)
ai_chatbot_thread_id = gr.Textbox(label="thread_id", visible=True)
ai_chatbot_socratic_mode_btn = gr.Checkbox(label="่˜‡ๆ ผๆ‹‰ๅบ•ๅฎถๆ•™ๅŠฉ็†ๆจกๅผ", value=True, visible=True)
latex_delimiters = [{"left": "$", "right": "$", "display": False}]
with gr.Accordion("้ธๆ“‡ AI ๅฐ็ฒพ้ˆ", elem_id="chatbot_select_accordion") as chatbot_select_accordion:
with gr.Row():
user_avatar = "https://em-content.zobj.net/source/google/263/flushed-face_1f633.png"
# ้ฃ›็‰น็ฒพ้ˆ
with gr.Column(scale=1, variant="panel", visible=True):
vaitor_chatbot_avatar_url = "https://junyitopicimg.s3.amazonaws.com/s4byy--icon.jpe?v=20200513013523726"
vaitor_chatbot_avatar_images = gr.State([user_avatar, vaitor_chatbot_avatar_url])
vaitor_chatbot_description = """Hi๏ผŒๆˆ‘ๆ˜ฏไฝ ็š„AIๅญธไผดใ€้ฃ›็‰น็ฒพ้ˆใ€‘๏ผŒ\n
ๆˆ‘ๅฏไปฅ้™ชไฝ ไธ€่ตทๅญธ็ฟ’ๆœฌๆฌก็š„ๅ…งๅฎน๏ผŒๆœ‰ไป€้บผๅ•้กŒ้ƒฝๅฏไปฅๅ•ๆˆ‘ๅ–”๏ผ\n
๐Ÿค” ๅฆ‚ๆžœไฝ ไธ็Ÿฅ้“ๆ€Ž้บผ็™ผๅ•๏ผŒๅฏไปฅ้ปžๆ“Šๅทฆไธ‹ๆ–น็š„ๅ•้กŒไธ€ใ€ๅ•้กŒไบŒใ€ๅ•้กŒไธ‰๏ผŒๆˆ‘ๆœƒๅนซไฝ ็”Ÿๆˆๅ•้กŒ๏ผ\n
๐Ÿ—ฃ๏ธ ไนŸๅฏไปฅ้ปžๆ“Šๅณไธ‹ๆ–น็”จ่ชž้Ÿณ่ผธๅ…ฅ๏ผŒๆˆ‘ๆœƒๅนซไฝ ่ฝ‰ๆ›ๆˆๆ–‡ๅญ—๏ผŒๅŽฒๅฎณๅง๏ผ\n
๐Ÿ”  ๆˆ–ๆ˜ฏ็›ดๆŽฅ้ต็›ค่ผธๅ…ฅไฝ ็š„ๅ•้กŒ๏ผŒๆˆ‘ๆœƒ็›กๅŠ›ๅ›ž็ญ”ไฝ ็š„ๅ•้กŒๅ–”๏ผ\n
๐Ÿ’ค ไฝ†ๆˆ‘้‚„ๅœจๆˆ้•ท๏ผŒ้ซ”ๅŠ›ๆœ‰้™๏ผŒๆฏไธ€ๆฌกๅญธ็ฟ’ๅช่ƒฝๅ›ž็ญ”ๅๅ€‹ๅ•้กŒ๏ผŒ่ซ‹่ฎ“ๆˆ‘ไผ‘ๆฏไธ€ไธ‹ๅ†ๅ•ๅ•้กŒๅ–”๏ผ\n
๐Ÿฆ„ ๅฆ‚ๆžœ้”ๅˆฐไธŠ้™๏ผŒๆˆ–ๆ˜ฏ้‡ๅˆฐ็ฒพ้ˆๅพˆ็ดฏ๏ผŒ่ซ‹ๅ•ๅ•ๅ…ถไป–ๆœ‹ๅ‹๏ผŒๅƒๆ˜ฏ้ฃ›็‰น้Ÿณ้€Ÿ่ชช่ฉฑ็š„้€Ÿๅบฆๆฏ”่ผƒๅฟซ๏ผŒไฝ ๆ˜ฏๅฆ่ทŸๅพ—ไธŠๅ‘ข๏ผŸไฝ ไนŸๅฏไปฅๅ’Œๅ…ถไป–็ฒพ้ˆไบ’ๅ‹•็œ‹็œ‹ๅ–”๏ผ\n
"""
chatbot_open_ai_name = gr.State("chatbot_open_ai")
gr.Image(value=vaitor_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False)
vaitor_chatbot_select_btn = gr.Button("๐Ÿ‘†้ธๆ“‡ใ€้ฃ›็‰น็ฒพ้ˆใ€‘", elem_id="chatbot_btn", visible=True, variant="primary")
with gr.Accordion("๐Ÿฆ„ ้ฃ›็‰น็ฒพ้ˆ ๆ•˜่ฟฐ", open=False):
vaitor_chatbot_description_value = gr.Markdown(value=vaitor_chatbot_description, visible=True)
# ็‹็‹ธ่ฒ“
with gr.Column(scale=1, variant="panel"):
foxcat_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/06/%E7%A7%91%E5%AD%B8%E5%BE%BD%E7%AB%A0-2-150x150.png"
foxcat_avatar_images = gr.State([user_avatar, foxcat_chatbot_avatar_url])
foxcat_chatbot_description = """Hi๏ผŒๆˆ‘ๆ˜ฏใ€็‹็‹ธ่ฒ“ใ€‘๏ผŒๅฏไปฅ้™ชไฝ ไธ€่ตทๅญธ็ฟ’ๆœฌๆฌก็š„ๅ…งๅฎน๏ผŒๆœ‰ไป€้บผๅ•้กŒ้ƒฝๅฏไปฅๅ•ๆˆ‘ๅ–”๏ผ\n
๐Ÿค” ไธ‰ๅนด็ดšๅญธ็”Ÿ๏ฝœ10 ๆญฒ๏ฝœ็”ท\n
๐Ÿ—ฃ๏ธ ๅฃ้ ญ็ฆช๏ผšใ€Œๆ„Ÿ่ฆบๅฅฝๅฅฝ็Žฉๅ–”๏ผใ€ใ€Œๅ’ฆ๏ผŸๆ˜ฏ้€™ๆจฃๅ—Ž๏ผŸใ€\n
๐Ÿ”  ่ˆˆ่ถฃ๏ผš็œ‹็Ÿฅ่ญ˜ๅž‹ๆ›ธ็ฑใ€็†ฑ่ก€็š„ๅ‹•ๆผซๅก้€šใ€ๆ–™็†ใ€็ˆฌๅฑฑใ€้จŽ่…ณ่ธ่ปŠใ€‚ๅ› ็‚บๅคชๅ–œๆญกๅƒ้ญšไบ†๏ผŒๆญฃๅŠชๅŠ›ๅ’Œ็ˆธ็ˆธๅญธ็ฟ’้‡ฃ้ญšใ€ๆ–™็†้ญšๅŠๅ„็จฎๆœ‰้—œ้ญš็š„็Ÿฅ่ญ˜๏ผŒๆœ€่จŽๅŽญ็š„้ฃŸ็‰ฉๆ˜ฏ้’ๆค’ใ€‚\n
๐Ÿ’ค ๅ€‹ๆ€ง๏ผšๅ–œๆญกๅญธ็ฟ’ๆ–ฐ็Ÿฅ๏ผŒๆ“ๆœ‰ๆœ€ๆ—บ็››็š„ๅฅฝๅฅ‡ๅฟƒ๏ผŒๅฎถ่ฃกๅ †ๆปฟ็™พ็ง‘ๅ…จๆ›ธ๏ผŒไพ‹ๅฆ‚๏ผšๅœ‹ๅฎถๅœฐ็†้ ป้“ๅ‡บ็‰ˆ็š„ใ€Œ็ต‚ๆฅต้ญš็™พ็ง‘ใ€๏ผŒ้›–้ƒฝๆฒ’ๆœ‰็œ‹ๅฎŒ๏ผŒๅธธๅธธ่ขซๆขจๆขจๅ”ธๆ˜ฏไธ‰ๅˆ†้˜็†ฑๅบฆ๏ผŒไฝ†ๆ˜ฏไนŸไธ€้ปžไธ€้ปžๅญธ็ฟ’ๅˆฐไธๅŒ้ ˜ๅŸŸ็š„็Ÿฅ่ญ˜ใ€‚้›–็„ถๆœ‰ๆ™‚ๆœƒๅฟ˜ๆฑๅฟ˜่ฅฟ๏ผŒไฝ†่ช็œŸ่ตทไพ†ไนŸๆ˜ฏๅพˆๅฏ้ ๏ผŒ็ญ”ๆ‡‰็š„ไบ‹็ต•ๅฐไฝฟๅ‘ฝๅฟ…้”ใ€‚้‡ๅˆฐๆŒ‘ๆˆฐๆ™‚๏ผŒๅ‹‡ๆ–ผ่ทณๅ‡บ่ˆ’้ฉๅœˆ๏ผŒ่ฟฝๆฑ‚่‡ชๆˆ‘ๆ”น่ฎŠ๏ผŒ่ฆ–ๅ›ฐ้›ฃ็‚บๆˆ้•ท็š„ๆฉŸๆœƒใ€‚
"""
foxcat_chatbot_name = gr.State("foxcat")
gr.Image(value=foxcat_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False)
foxcat_chatbot_select_btn = gr.Button("๐Ÿ‘†้ธๆ“‡ใ€็‹็‹ธ่ฒ“ใ€‘", visible=True, variant="primary", elem_classes="chatbot_select_btn")
with gr.Accordion("๐Ÿ’œ ็‹็‹ธ่ฒ“ ๆ•˜่ฟฐ", open=False):
foxcat_chatbot_description_value = gr.Markdown(value=foxcat_chatbot_description, visible=True)
# ๆขจๆขจ
with gr.Column(scale=1, variant="panel"):
lili_chatbot_avatar_url = "https://junyitopicimg.s3.amazonaws.com/live/v1283-new-topic-44-icon.png?v=20230529071206714"
lili_avatar_images = gr.State([user_avatar, lili_chatbot_avatar_url])
lili_chatbot_description = """ไฝ ๅฅฝ๏ผŒๆˆ‘ๆ˜ฏๆบซๆŸ”็š„ใ€ๆขจๆขจใ€‘๏ผŒๅพˆ้ซ˜่ˆˆๅฏไปฅๅœจ้€™่ฃก้™ชไผดไฝ ๅญธ็ฟ’ใ€‚ๅฆ‚ๆžœไฝ ๆœ‰ไปปไฝ•็–‘ๅ•๏ผŒ่ซ‹้šจๆ™‚ๅ‘ๆˆ‘ๆๅ‡บๅ“ฆ๏ผ \n
๐Ÿค” ไธ‰ๅนด็ดšๅญธ็”Ÿ๏ฝœ10 ๆญฒ๏ฝœๅฅณ\n
๐Ÿ—ฃ๏ธ ๅฃ้ ญ็ฆช๏ผšใ€Œ็œŸ็š„ๅ‡็š„๏ผŸ๏ผใ€ใ€Œ่ฎ“ๆˆ‘ๆƒณไธ€ๆƒณๅ–”ใ€ใ€Œไฝ ็œ‹ๅง๏ผๅคงๅ•้กŒๆ‹†่งฃๆˆๅฐๅ•้กŒ๏ผŒๅฐฑ่ฎŠๅพ—็ฐกๅ–ฎๅ•ฆ๏ผใ€ใ€Œๆททๆททๅ™ฉๅ™ฉ็š„็”Ÿๆดปไธๅ€ผๅพ—้Žใ€\n
๐Ÿ”  ่ˆˆ่ถฃ๏ผš็ƒ˜็„™้ค…ไนพ๏ผˆ็ˆถๆฏ้–‹็ณ•้ค…ๅบ—๏ผ‰ใ€็•ซ็•ซใ€่ฝๆต่กŒ้Ÿณๆจ‚ใ€ๆ”ถ็ดใ€‚\n
๐Ÿ’ค ๅ€‹ๆ€ง๏ผš
- ๅ…งๅ‘ๅฎณ็พž๏ผŒๆฏ”่ตทๅ‡บๅŽป็Žฉๆ›ดๅ–œๆญกๅพ…ๅœจๅฎถ๏ผˆ้™ค้žๆ˜ฏ่ทŸ็‹็‹ธ่ฒ“ๅ‡บๅŽป็Žฉ๏ผ‰
- ๆ•ธ็†้‚่ผฏๅพˆๅฅฝ๏ผ›ๅ…ถๅฏฆ่ฆบๅพ—้บฅ้บฅ้€ฃ็ ็‚ฎ็š„ๆๅ•ๆœ‰้ปž็…ฉ๏ผŒไฝ†้‚„ๆ˜ฏๆœƒ่€ๅฟƒๅœฐๅ›ž็ญ”
- ๆœ‰้ฉšไบบ็š„็œผๅŠ›๏ผŒ็ธฝ่ƒฝ่ง€ๅฏŸๅˆฐๅ…ถไป–ไบบๆฒ’ๆœ‰ๅฏŸ่ฆบ็š„็ดฐ็ฏ€
- ๅ–œๆญกๆ•ดๆ•ด้ฝŠ้ฝŠ็š„็’ฐๅขƒ๏ผŒๆ‰€ไปฅไธ€ๅˆฐ้บฅ้บฅๅฎถๅฐฑๅ—ไธไบ†
"""
lili_chatbot_name = gr.State("lili")
gr.Image(value=lili_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False)
lili_chatbot_select_btn = gr.Button("๐Ÿ‘†้ธๆ“‡ใ€ๆขจๆขจใ€‘", visible=True, variant="primary", elem_classes="chatbot_select_btn")
with gr.Accordion("๐Ÿงก ๆขจๆขจ ๆ•˜่ฟฐ", open=False):
lili_chatbot_description_value = gr.Markdown(value=lili_chatbot_description, visible=True)
# ้บฅ้บฅ
with gr.Column(scale=1, variant="panel"):
maimai_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/07/%E6%80%9D%E8%80%83%E5%8A%9B%E8%B6%85%E4%BA%BA%E5%BE%BD%E7%AB%A0_%E5%B7%A5%E4%BD%9C%E5%8D%80%E5%9F%9F-1-%E8%A4%87%E6%9C%AC-150x150.png"
maimai_avatar_images = gr.State([user_avatar, maimai_chatbot_avatar_url])
maimai_chatbot_description = """Hi๏ผŒๆˆ‘ๆ˜ฏ่ฟทไบบ็š„ใ€้บฅ้บฅใ€‘๏ผŒๆˆ‘ๅœจ้€™่ฃก็ญ‰่‘—ๅ’Œไฝ ไธ€่ตทๆŽข็ดขๆ–ฐ็Ÿฅ๏ผŒไปปไฝ•็–‘ๅ•้ƒฝๅฏไปฅๅ‘ๆˆ‘ๆๅ‡บ๏ผ\n
๐Ÿค” ไธ‰ๅนด็ดšๅญธ็”Ÿ๏ฝœ10 ๆญฒ๏ฝœ็”ท\n
๐Ÿ—ฃ๏ธ ๅฃ้ ญ็ฆช๏ผšใ€ŒOh My God!ใ€ใ€Œๅฅฝๅฅ‡ๆ€ชๅ–”๏ผใ€ใ€Œๅ–”๏ผๅŽŸไพ†ๆ˜ฏ้€™ๆจฃๅ•Š๏ผใ€\n
๐Ÿ”  ่ˆˆ่ถฃ๏ผšๆœ€ๆ„›ๅŽป้‡Žๅค–็Žฉ่€๏ผˆๅฟƒๆƒ…ๅฅฝๆ™‚ๆœƒ้ †ไพฟๆ•้ญš้€็ตฆ็‹็‹ธ่ฒ“๏ผ‰๏ผŒๅ–œๆญก่ฌ›ๅ†ท็ฌ‘่ฉฑใ€ๆƒกไฝœๅŠ‡ใ€‚ๅ› ็‚บๅคชๅ–œๆญก็Žฉๅ…ท๏ผŒ่€Œ้–‹ๅง‹่‡ชๅทฑๅš็Žฉๅ…ท๏ผŒๅฎถ่ฃกๅฐฑๅฅฝๅƒไป–็š„้Šๆจ‚ๅ ดใ€‚\n
๐Ÿ’ค ๅ€‹ๆ€ง๏ผšๅ–œๆญกๅ•ๅ•้กŒ๏ผŒๅฐฑ็ฎ—่ขซๆขจๆขจใ„˜ใ„Ÿ๏ผŒไนŸ้‚„ๆ˜ฏ็…งๅ•๏ฝœๆ†จๅŽš๏ผŒๅค–ๅ‘ๅฅฝๅ‹•๏ผŒๆจ‚ๅคฉ้–‹ๆœ—๏ผŒไธๆœƒ่ขซ้›ฃ้กŒๆ‰“ๆ•—๏ฝœๅ–œๆญกๆ”ถ้›†ๅ„ๅผๅ„ๆจฃ็š„ๆฑ่ฅฟ๏ผ›ๆˆฟ้–“ๅชๆœ‰ๅœจๆ•ด็†็š„้‚ฃไธ€ๅคฉๆœ€ไนพๆทจ
"""
maimai_chatbot_name = gr.State("maimai")
gr.Image(value=maimai_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False)
maimai_chatbot_select_btn = gr.Button("๐Ÿ‘†้ธๆ“‡ใ€้บฅ้บฅใ€‘", visible=True, variant="primary", elem_classes="chatbot_select_btn")
with gr.Accordion("๐Ÿ’™ ้บฅ้บฅ ๆ•˜่ฟฐ", open=False):
maimai_chatbot_description_value = gr.Markdown(value=maimai_chatbot_description, visible=True)
# ้ฃ›็‰น้Ÿณ้€Ÿ
with gr.Column(scale=1, variant="panel", visible=True):
streaming_chatbot_avatar_url = "https://storage.googleapis.com/wpassets.junyiacademy.org/1/2020/11/1-%E6%98%9F%E7%A9%BA%E9%A0%AD%E8%B2%BC-%E5%A4%AA%E7%A9%BA%E7%8B%90%E7%8B%B8%E8%B2%93-150x150.png"
streaming_chatbot_description = """Hi๏ผŒๆˆ‘ๆ˜ฏใ€้ฃ›็‰น้Ÿณ้€Ÿใ€‘๏ผŒ \n
่ชช่ฉฑๆฏ”่ผƒๅฟซ๏ผŒไฝ†ๆœ‰ไป€้บผๅ•้กŒ้ƒฝๅฏไปฅๅ•ๆˆ‘ๅ–”๏ผ \n
๐Ÿš€ ๆˆ‘ๆฒ’ๆœ‰้ ่จญๅ•้กŒใ€ไนŸๆฒ’ๆœ‰่ชž้Ÿณ่ผธๅ…ฅ๏ผŒ้ฉๅˆๅฟซๅ•ๅฟซ็ญ”๏ผŒไธ€่ตท็ทด็ฟ’ๅ•ๅ‡บๅฅฝๅ•้กŒๅง \n
๐Ÿ”  ๆ“…้•ท็”จๆ–‡ๅญ—่กจ้”็š„ไฝ ๏ผŒๅฏไปฅ็”จ้ต็›ค่ผธๅ…ฅไฝ ็š„ๅ•้กŒ๏ผŒๆˆ‘ๆœƒ็›กๅŠ›ๅ›ž็ญ”ไฝ ็š„ๅ•้กŒๅ–”\n
๐Ÿ’ค ๆˆ‘้‚„ๅœจๆˆ้•ท๏ผŒ้ซ”ๅŠ›ๆœ‰้™๏ผŒๆฏไธ€ๆฌกๅญธ็ฟ’ๅช่ƒฝๅ›ž็ญ”ๅๅ€‹ๅ•้กŒ๏ผŒ่ซ‹่ฎ“ๆˆ‘ไผ‘ๆฏไธ€ไธ‹ๅ†ๅ•ๅ•้กŒๅ–”๏ฝž
"""
chatbot_open_ai_streaming_name = gr.State("chatbot_open_ai_streaming")
gr.Image(value=streaming_chatbot_avatar_url, height=100, width=100, show_label=False, show_download_button=False)
chatbot_open_ai_streaming_select_btn = gr.Button("๐Ÿ‘†้ธๆ“‡ใ€้ฃ›็‰น้Ÿณ้€Ÿใ€‘", elem_id="streaming_chatbot_btn", visible=True, variant="primary")
with gr.Accordion("๐Ÿš€ ้ฃ›็‰น้Ÿณ้€Ÿ ๆ•˜่ฟฐ", open=False):
gr.Markdown(value=streaming_chatbot_description, visible=True)
# ๅฐšๆœช้–‹ๆ”พ
with gr.Column(scale=1, variant="panel"):
gr.Markdown(value="### ๅฐšๆœช้–‹ๆ”พ", visible=True)
with gr.Row("้ฃ›็‰น้Ÿณ้€Ÿ") as chatbot_open_ai_streaming:
with gr.Column():
streaming_chat_greeting = """
Hi๏ผŒๆˆ‘ๆ˜ฏใ€้ฃ›็‰น้Ÿณ้€Ÿใ€‘๏ผŒ่ชช่ฉฑๆฏ”่ผƒๅฟซ๏ผŒไฝ†ๆœ‰ไป€้บผๅ•้กŒ้ƒฝๅฏไปฅๅ•ๆˆ‘ๅ–”๏ผ \n
๐Ÿš€ ๆˆ‘ๆฒ’ๆœ‰้ ่จญๅ•้กŒใ€ไนŸๆฒ’ๆœ‰่ชž้Ÿณ่ผธๅ…ฅ๏ผŒ้ฉๅˆๅฟซๅ•ๅฟซ็ญ”็š„ไฝ  \n
๐Ÿ”  ้ต็›ค่ผธๅ…ฅไฝ ็š„ๅ•้กŒ๏ผŒๆˆ‘ๆœƒ็›กๅŠ›ๅ›ž็ญ”ไฝ ็š„ๅ•้กŒๅ–”๏ผ\n
๐Ÿ’ค ๆˆ‘้‚„ๅœจๆˆ้•ท๏ผŒ้ซ”ๅŠ›ๆœ‰้™๏ผŒๆฏไธ€ๆฌกๅญธ็ฟ’ๅช่ƒฝๅ›ž็ญ”ๅๅ€‹ๅ•้กŒ๏ผŒ่ซ‹่ฎ“ๆˆ‘ไผ‘ๆฏไธ€ไธ‹ๅ†ๅ•ๅ•้กŒๅ–”๏ผ
"""
additional_inputs = [password, video_id, user_data, streaming_chat_thread_id_state, trascript_state, key_moments_state, content_subject_state, content_grade_state, ai_chatbot_socratic_mode_btn]
streaming_chat = gr.ChatInterface(
fn=chat_with_opan_ai_assistant_streaming,
additional_inputs=additional_inputs,
submit_btn="้€ๅ‡บ",
retry_btn=None,
undo_btn="โช ไธŠไธ€ๆญฅ",
clear_btn="๐Ÿ—‘๏ธ ๆธ…้™คๅ…จ้ƒจ",
stop_btn=None,
description=streaming_chat_greeting
)
with gr.Row("ไธ€่ˆฌ็ฒพ้ˆ") as chatbot_ai:
with gr.Column():
ai_chatbot_greeting = [[
"่ซ‹ๅ•ไฝ ๆ˜ฏ่ชฐ๏ผŸ",
"""Hi๏ผŒๆˆ‘ๆ˜ฏ้ฃ›็‰น็ฒพ้ˆ็š„ๆœ‹ๅ‹ๅ€‘ใ€ๆขจๆขจใ€้บฅ้บฅใ€็‹็‹ธ่ฒ“ใ€‘๏ผŒไนŸๅฏไปฅ้™ชไฝ ไธ€่ตทๅญธ็ฟ’ๆœฌๆฌก็š„ๅ…งๅฎน๏ผŒๆœ‰ไป€้บผๅ•้กŒ้ƒฝๅฏไปฅๅ•ๆˆ‘ๅ–”๏ผ
๐Ÿค” ๅฆ‚ๆžœไฝ ไธ็Ÿฅ้“ๆ€Ž้บผ็™ผๅ•๏ผŒๅฏไปฅ้ปžๆ“Šๅทฆไธ‹ๆ–น็š„ๅ•้กŒไธ€ใ€ๅ•้กŒไบŒใ€ๅ•้กŒไธ‰๏ผŒๆˆ‘ๆœƒๅนซไฝ ็”Ÿๆˆๅ•้กŒ๏ผ
๐Ÿ—ฃ๏ธ ไนŸๅฏไปฅ้ปžๆ“Šๅณไธ‹ๆ–น็”จ่ชž้Ÿณ่ผธๅ…ฅ๏ผŒๆˆ‘ๆœƒๅนซไฝ ่ฝ‰ๆ›ๆˆๆ–‡ๅญ—๏ผŒๅŽฒๅฎณๅง๏ผ
๐Ÿ”  ๆˆ–ๆ˜ฏ็›ดๆŽฅ้ต็›ค่ผธๅ…ฅไฝ ็š„ๅ•้กŒ๏ผŒๆˆ‘ๆœƒ็›กๅŠ›ๅ›ž็ญ”ไฝ ็š„ๅ•้กŒๅ–”๏ผ
๐Ÿ’ค ็ฒพ้ˆๅ€‘้ซ”ๅŠ›้ƒฝๆœ‰้™๏ผŒๆฏไธ€ๆฌกๅญธ็ฟ’ๅช่ƒฝๅ›ž็ญ”ๅๅ€‹ๅ•้กŒ๏ผŒ่ซ‹่ฎ“ๆˆ‘ไผ‘ๆฏไธ€ไธ‹ๅ†ๅ•ๅ•้กŒๅ–”๏ผ
""",
]]
with gr.Row():
ai_chatbot = gr.Chatbot(label="ai_chatbot", show_share_button=False, likeable=True, show_label=False, latex_delimiters=latex_delimiters, value=ai_chatbot_greeting)
with gr.Row():
with gr.Accordion("ไฝ ไนŸๆœ‰้กžไผผ็š„ๅ•้กŒๆƒณๅ•ๅ—Ž๏ผŸ ่ซ‹ๆŒ‰ไธ‹ โ—€๏ธŽ", open=False) as ask_questions_accordion_2:
ai_chatbot_question_1 = gr.Button("ๅ•้กŒไธ€")
ai_chatbot_question_2 = gr.Button("ๅ•้กŒไธ€")
ai_chatbot_question_3 = gr.Button("ๅ•้กŒไธ€")
create_questions_btn = gr.Button("็”Ÿๆˆๅ•้กŒ", variant="primary")
ai_chatbot_audio_input = gr.Audio(sources=["microphone"], type="filepath", max_length=60, label="่ชž้Ÿณ่ผธๅ…ฅ")
with gr.Row():
ai_msg = gr.Textbox(label="่จŠๆฏ่ผธๅ…ฅ",scale=3)
ai_send_button = gr.Button("้€ๅ‡บ", variant="primary",scale=1)
ai_send_feedback_btn = gr.Button("ๆๅ•ๅŠ›ๅ›ž้ฅ‹", variant="primary", scale=1, visible=False)
with gr.Tab("ๆ–‡็ซ ๆจกๅผ"):
with gr.Row():
reading_passage = gr.Markdown(show_label=False, latex_delimiters = [{"left": "$", "right": "$", "display": False}])
reading_passage_speak_button = gr.Button("Speak", visible=False)
reading_passage_audio_output = gr.Audio(label="Audio Output", visible=False)
with gr.Tab("้‡้ปžๆ‘˜่ฆ"):
with gr.Row():
df_summarise = gr.Markdown(show_label=False, latex_delimiters = [{"left": "$", "right": "$", "display": False}])
with gr.Tab("้—œ้ตๆ™‚ๅˆป"):
with gr.Row():
key_moments_html = gr.HTML(value="")
with gr.Tab("ๆ•™ๅญธๅ‚™่ชฒ"):
with gr.Row():
content_subject = gr.Dropdown(label="้ธๆ“‡ไธป้กŒ", choices=["ๆ•ธๅญธ", "่‡ช็„ถ", "ๅœ‹ๆ–‡", "่‹ฑๆ–‡", "็คพๆœƒ","็‰ฉ็†", "ๅŒ–ๅญธ", "็”Ÿ็‰ฉ", "ๅœฐ็†", "ๆญทๅฒ", "ๅ…ฌๆฐ‘"], value="", visible=False)
content_grade = gr.Dropdown(label="้ธๆ“‡ๅนด็ดš", choices=["ไธ€ๅนด็ดš", "ไบŒๅนด็ดš", "ไธ‰ๅนด็ดš", "ๅ››ๅนด็ดš", "ไบ”ๅนด็ดš", "ๅ…ญๅนด็ดš", "ไธƒๅนด็ดš", "ๅ…ซๅนด็ดš", "ไนๅนด็ดš", "ๅๅนด็ดš", "ๅไธ€ๅนด็ดš", "ๅไบŒๅนด็ดš"], value="", visible=False)
content_level = gr.Dropdown(label="ๅทฎ็•ฐๅŒ–ๆ•™ๅญธ", choices=["ๅŸบ็คŽ", "ไธญ็ดš", "้€ฒ้šŽ"], value="ๅŸบ็คŽ")
with gr.Row():
with gr.Tab("ๅญธ็ฟ’ๅ–ฎ"):
with gr.Row():
with gr.Column(scale=1):
with gr.Row():
worksheet_content_type_name = gr.Textbox(value="worksheet", visible=False)
worksheet_algorithm = gr.Dropdown(label="้ธๆ“‡ๆ•™ๅญธ็ญ–็•ฅๆˆ–็†่ซ–", choices=["Bloom่ช็Ÿฅ้šŽๅฑค็†่ซ–", "Polyaๆ•ธๅญธ่งฃ้กŒๆณ•", "CRAๆ•™ๅญธๆณ•"], value="Bloom่ช็Ÿฅ้šŽๅฑค็†่ซ–", visible=False)
worksheet_content_btn = gr.Button("็”Ÿๆˆๅญธ็ฟ’ๅ–ฎ ๐Ÿ“„", variant="primary", visible=True)
with gr.Accordion("ๅพฎ่ชฟ", open=False):
worksheet_exam_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ“š็ตๆžœ๏ผŒ่ผธๅ…ฅไฝ ๆƒณๆ›ดๆ”น็š„ๆƒณๆณ•")
worksheet_exam_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆžœ", variant="primary")
worksheet_exam_result_retrun_original = gr.Button("่ฟ”ๅ›žๅŽŸๅง‹็ตๆžœ")
with gr.Accordion("prompt", open=False) as worksheet_accordion:
worksheet_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Column(scale=2):
# ็”Ÿๆˆๅฐๆ‡‰ไธๅŒๆจกๅผ็š„็ตๆžœ
worksheet_exam_result_prompt = gr.Textbox(visible=False)
worksheet_exam_result_original = gr.Textbox(visible=False)
# worksheet_exam_result = gr.Textbox(label="ๅˆๆฌก็”Ÿๆˆ็ตๆžœ", show_copy_button=True, interactive=True, lines=40)
worksheet_exam_result = gr.Markdown(label="ๅˆๆฌก็”Ÿๆˆ็ตๆžœ", latex_delimiters = [{"left": "$", "right": "$", "display": False}])
worksheet_download_exam_result_button = gr.Button("่ฝ‰ๆˆ word๏ผŒๅฎŒๆˆๅพŒ่ซ‹้ปžๆ“Šๅณไธ‹่ง’ download ๆŒ‰้ˆ•", variant="primary")
worksheet_exam_result_word_link = gr.File(label="Download Word")
with gr.Tab("ๆ•™ๆกˆ"):
with gr.Row():
with gr.Column(scale=1):
with gr.Row():
lesson_plan_content_type_name = gr.Textbox(value="lesson_plan", visible=False)
lesson_plan_time = gr.Slider(label="้ธๆ“‡่ชฒ็จ‹ๆ™‚้–“(ๅˆ†้˜)", minimum=10, maximum=120, step=5, value=40)
lesson_plan_btn = gr.Button("็”Ÿๆˆๆ•™ๆกˆ ๐Ÿ“•", variant="primary", visible=True)
with gr.Accordion("ๅพฎ่ชฟ", open=False):
lesson_plan_exam_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ“š็ตๆžœ๏ผŒ่ผธๅ…ฅไฝ ๆƒณๆ›ดๆ”น็š„ๆƒณๆณ•")
lesson_plan_exam_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆžœ", variant="primary")
lesson_plan_exam_result_retrun_original = gr.Button("่ฟ”ๅ›žๅŽŸๅง‹็ตๆžœ")
with gr.Accordion("prompt", open=False) as lesson_plan_accordion:
lesson_plan_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Column(scale=2):
# ็”Ÿๆˆๅฐๆ‡‰ไธๅŒๆจกๅผ็š„็ตๆžœ
lesson_plan_exam_result_prompt = gr.Textbox(visible=False)
lesson_plan_exam_result_original = gr.Textbox(visible=False)
lesson_plan_exam_result = gr.Markdown(label="ๅˆๆฌก็”Ÿๆˆ็ตๆžœ", latex_delimiters = [{"left": "$", "right": "$", "display": False}])
lesson_plan_download_exam_result_button = gr.Button("่ฝ‰ๆˆ word๏ผŒๅฎŒๆˆๅพŒ่ซ‹้ปžๆ“Šๅณไธ‹่ง’ download ๆŒ‰้ˆ•", variant="primary")
lesson_plan_exam_result_word_link = gr.File(label="Download Word")
with gr.Tab("ๅ‡บๅ ดๅˆธ"):
with gr.Row():
with gr.Column(scale=1):
with gr.Row():
exit_ticket_content_type_name = gr.Textbox(value="exit_ticket", visible=False)
exit_ticket_time = gr.Slider(label="้ธๆ“‡ๅ‡บๅ ดๅˆธๆ™‚้–“(ๅˆ†้˜)", minimum=5, maximum=10, step=1, value=8)
exit_ticket_btn = gr.Button("็”Ÿๆˆๅ‡บๅ ดๅˆธ ๐ŸŽŸ๏ธ", variant="primary", visible=True)
with gr.Accordion("ๅพฎ่ชฟ", open=False):
exit_ticket_exam_result_fine_tune_prompt = gr.Textbox(label="ๆ นๆ“š็ตๆžœ๏ผŒ่ผธๅ…ฅไฝ ๆƒณๆ›ดๆ”น็š„ๆƒณๆณ•")
exit_ticket_exam_result_fine_tune_btn = gr.Button("ๅพฎ่ชฟ็ตๆžœ", variant="primary")
exit_ticket_exam_result_retrun_original = gr.Button("่ฟ”ๅ›žๅŽŸๅง‹็ตๆžœ")
with gr.Accordion("prompt", open=False) as exit_ticket_accordion:
exit_ticket_prompt = gr.Textbox(label="worksheet_prompt", show_copy_button=True, lines=40)
with gr.Column(scale=2):
# ็”Ÿๆˆๅฐๆ‡‰ไธๅŒๆจกๅผ็š„็ตๆžœ
exit_ticket_exam_result_prompt = gr.Textbox(visible=False)
exit_ticket_exam_result_original = gr.Textbox(visible=False)
exit_ticket_exam_result = gr.Markdown(label="ๅˆๆฌก็”Ÿๆˆ็ตๆžœ", latex_delimiters = [{"left": "$", "right": "$", "display": False}])
exit_ticket_download_exam_result_button = gr.Button("่ฝ‰ๆˆ word๏ผŒๅฎŒๆˆๅพŒ่ซ‹้ปžๆ“Šๅณไธ‹่ง’ download ๆŒ‰้ˆ•", variant="primary")
exit_ticket_exam_result_word_link = gr.File(label="Download Word")
# with gr.Tab("็ด ้คŠๅฐŽๅ‘้–ฑ่ฎ€้กŒ็ต„"):
# literacy_oriented_reading_content = gr.Textbox(label="่ผธๅ…ฅ้–ฑ่ฎ€ๆๆ–™")
# literacy_oriented_reading_content_btn = gr.Button("็”Ÿๆˆ้–ฑ่ฎ€็†่งฃ้กŒ")
# with gr.Tab("่‡ชๆˆ‘่ฉ•ไผฐ"):
# self_assessment_content = gr.Textbox(label="่ผธๅ…ฅ่‡ช่ฉ•ๅ•ๅทๆˆ–ๆชขๆŸฅ่กจ")
# self_assessment_content_btn = gr.Button("็”Ÿๆˆ่‡ช่ฉ•ๅ•ๅท")
# with gr.Tab("่‡ชๆˆ‘ๅๆ€่ฉ•้‡"):
# self_reflection_content = gr.Textbox(label="่ผธๅ…ฅ่‡ชๆˆ‘ๅๆ€ๆดปๅ‹•")
# self_reflection_content_btn = gr.Button("็”Ÿๆˆ่‡ชๆˆ‘ๅๆ€ๆดปๅ‹•")
# with gr.Tab("ๅพŒ่จญ่ช็Ÿฅ"):
# metacognition_content = gr.Textbox(label="่ผธๅ…ฅๅพŒ่จญ่ช็Ÿฅ็›ธ้—œๅ•้กŒ")
# metacognition_content_btn = gr.Button("็”ŸๆˆๅพŒ่จญ่ช็Ÿฅๅ•้กŒ")
with gr.Accordion("See Details", open=False) as see_details:
with gr.Tab("้€ๅญ—็จฟๆœฌๆ–‡"):
with gr.Row() as transcript_admmin:
transcript_kind = gr.Textbox(value="transcript", show_label=False)
transcript_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
transcript_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
transcript_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
transcript_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
transcript_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
df_string_output = gr.Textbox(lines=40, label="Data Text", interactive=False, show_copy_button=True)
with gr.Tab("ๆ–‡็ซ ๆœฌๆ–‡"):
with gr.Row() as reading_passage_admin:
with gr.Column():
with gr.Row():
reading_passage_kind = gr.Textbox(value="reading_passage_latex", show_label=False)
with gr.Row():
# reading_passage_text_to_latex = gr.Button("ๆ–ฐๅขž LaTeX", size="sm", variant="primary")
reading_passage_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
reading_passage_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
reading_passage_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
reading_passage_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
reading_passage_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
reading_passage_text = gr.Textbox(label="reading_passage_latex", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("้‡้ปžๆ‘˜่ฆๆœฌๆ–‡"):
with gr.Row() as summary_admmin:
with gr.Column():
with gr.Row():
summary_kind = gr.Textbox(value="summary_markdown", show_label=False)
with gr.Row():
# summary_to_markdown = gr.Button("ๆ–ฐๅขž Markdown", size="sm", variant="primary")
summary_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
summary_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
summary_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
summary_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
summary_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
summary_text = gr.Textbox(label="summary_markdown", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("้—œ้ตๆ™‚ๅˆปๆœฌๆ–‡"):
with gr.Row() as key_moments_admin:
key_moments_kind = gr.Textbox(value="key_moments", show_label=False)
key_moments_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
key_moments_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
key_moments_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
key_moments_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
key_moments_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
key_moments = gr.Textbox(label="Key Moments", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("ๅ•้กŒๆœฌๆ–‡"):
with gr.Row() as question_list_admin:
questions_kind = gr.Textbox(value="questions", show_label=False)
questions_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
questions_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
questions_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
questions_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
questions_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
questions_json = gr.Textbox(label="Questions", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("ๅ•้กŒ็ญ”ๆกˆๆœฌๆ–‡"):
with gr.Row() as questions_answers_admin:
questions_answers_kind = gr.Textbox(value="questions_answers", show_label=False)
questions_answers_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
questions_answers_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
questions_answers_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
questions_answers_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
questions_answers_create_button = gr.Button("้‡ๅปบ", size="sm", variant="primary")
with gr.Row():
questions_answers_json = gr.Textbox(label="Questions Answers", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("ๆ•™ๅญธๅ‚™่ชฒ"):
with gr.Row() as worksheet_admin:
worksheet_kind = gr.Textbox(value="ai_content_list", show_label=False)
worksheet_get_button = gr.Button("ๅ–ๅพ—", size="sm", variant="primary")
worksheet_edit_button = gr.Button("็ทจ่ผฏ", size="sm", variant="primary")
worksheet_update_button = gr.Button("ๅ„ฒๅญ˜", size="sm", variant="primary")
worksheet_delete_button = gr.Button("ๅˆช้™ค", size="sm", variant="primary")
worksheet_create_button = gr.Button("้‡ๅปบ(X)", size="sm", variant="primary", interactive=False)
with gr.Row():
worksheet_json = gr.Textbox(label="worksheet", lines=40, interactive=False, show_copy_button=True)
with gr.Tab("้€ๅญ—็จฟ"):
simple_html_content = gr.HTML(label="Simple Transcript")
with gr.Tab("ๅœ–ๆ–‡"):
transcript_html = gr.HTML(label="YouTube Transcript and Video")
with gr.Tab("ๆŠ•ๅฝฑ็‰‡"):
slide_image = gr.Image()
slide_text = gr.Textbox()
with gr.Row():
prev_button = gr.Button("Previous")
next_button = gr.Button("Next")
prev_button.click(fn=prev_slide, inputs=[], outputs=[slide_image, slide_text])
next_button.click(fn=next_slide, inputs=[], outputs=[slide_image, slide_text])
with gr.Tab("markdown"):
gr.Markdown("## ่ซ‹่ค‡่ฃฝไปฅไธ‹ markdown ไธฆ่ฒผๅˆฐไฝ ็š„ๅฟƒๆ™บๅœ–ๅทฅๅ…ทไธญ๏ผŒๅปบ่ญฐไฝฟ็”จ๏ผšhttps://markmap.js.org/repl")
mind_map = gr.Textbox(container=True, show_copy_button=True, lines=40, elem_id="mind_map_markdown")
with gr.Tab("ๅฟƒๆ™บๅœ–",elem_id="mind_map_tab"):
mind_map_html = gr.HTML()
# OPEN AI CHATBOT SELECT
chatbot_select_outputs=[
chatbot_select_accordion,
all_chatbot_select_btn,
chatbot_open_ai_streaming,
chatbot_ai,
ai_name,
ai_chatbot_ai_type,
ai_chatbot_thread_id
]
# ่Šๅคฉๆœบๅ™จไบบ็š„้…็ฝฎๆ•ฐๆฎ
chatbots = [
{
"button": vaitor_chatbot_select_btn,
"name_state": chatbot_open_ai_name,
"avatar_images": vaitor_chatbot_avatar_images,
"description_value": vaitor_chatbot_description_value,
"chatbot_select_outputs": chatbot_select_outputs,
"chatbot_output": ai_chatbot
},
{
"button": foxcat_chatbot_select_btn,
"name_state": foxcat_chatbot_name,
"avatar_images": foxcat_avatar_images,
"description_value": foxcat_chatbot_description_value,
"chatbot_select_outputs": chatbot_select_outputs,
"chatbot_output": ai_chatbot
},
{
"button": lili_chatbot_select_btn,
"name_state": lili_chatbot_name,
"avatar_images": lili_avatar_images,
"description_value": lili_chatbot_description_value,
"chatbot_select_outputs": chatbot_select_outputs,
"chatbot_output": ai_chatbot
},
{
"button": maimai_chatbot_select_btn,
"name_state": maimai_chatbot_name,
"avatar_images": maimai_avatar_images,
"description_value": maimai_chatbot_description_value,
"chatbot_select_outputs": chatbot_select_outputs,
"chatbot_output": ai_chatbot
}
]
def setup_chatbot_select_button(chatbot_dict):
button = chatbot_dict["button"]
chatbot_name_state = chatbot_dict["name_state"]
avatar_images = chatbot_dict["avatar_images"]
description_value = chatbot_dict["description_value"]
chatbot_select_outputs = chatbot_dict["chatbot_select_outputs"]
chatbot_output = chatbot_dict["chatbot_output"]
button.click(
chatbot_select, # ไฝ ๅฏ่ƒฝ้œ€่ฆไฟฎๆ”น่ฟ™ไธชๅ‡ฝๆ•ฐไปฅ้€‚ๅบ”ๅฝ“ๅ‰็š„้€ป่พ‘
inputs=[chatbot_name_state],
outputs=chatbot_select_outputs
).then(
update_avatar_images,
inputs=[avatar_images, description_value],
outputs=[chatbot_output],
scroll_to_output=True
)
for chatbot_dict in chatbots:
setup_chatbot_select_button(chatbot_dict)
# STREAMING CHATBOT SELECT
chatbot_open_ai_streaming_select_btn.click(
chatbot_select,
inputs=[chatbot_open_ai_streaming_name],
outputs=chatbot_select_outputs
).then(
create_thread_id,
inputs=[],
outputs=[streaming_chat_thread_id_state]
)
# ALL CHATBOT SELECT LIST
all_chatbot_select_btn.click(
show_all_chatbot_accordion,
inputs=[],
outputs=[chatbot_select_accordion, all_chatbot_select_btn]
)
# OPENAI ASSISTANT CHATBOT ้€ฃๆŽฅๆŒ‰้ˆ•้ปžๆ“Šไบ‹ไปถ
def setup_question_button_click(button, inputs_list, outputs_list, chat_func, scroll_to_output=True):
button.click(
chat_func,
inputs=inputs_list,
outputs=outputs_list,
scroll_to_output=scroll_to_output
)
# ๅ…ถไป–็ฒพ้ˆ ai_chatbot ๆจกๅผ
ai_send_button.click(
chat_with_any_ai,
inputs=[ai_chatbot_ai_type, password, video_id, user_data, trascript_state, key_moments, ai_msg, ai_chatbot, content_subject, content_grade, questions_answers_json, ai_chatbot_socratic_mode_btn, ai_chatbot_thread_id, ai_name],
outputs=[ai_msg, ai_chatbot, ai_send_button, ai_send_feedback_btn, ai_chatbot_thread_id],
scroll_to_output=True
)
ai_send_feedback_btn.click(
feedback_with_ai,
inputs=[ai_chatbot_ai_type, ai_chatbot, ai_chatbot_thread_id],
outputs=[ai_chatbot, ai_send_feedback_btn],
scroll_to_output=True
)
# ๅ…ถไป–็ฒพ้ˆ ai_chatbot ่ฟžๆŽฅ QA ๆŒ‰้’ฎ็‚นๅ‡ปไบ‹ไปถ
ai_chatbot_question_buttons = [ai_chatbot_question_1, ai_chatbot_question_2, ai_chatbot_question_3]
for question_btn in ai_chatbot_question_buttons:
inputs_list = [ai_chatbot_ai_type, password, video_id, user_data, trascript_state, key_moments, question_btn, ai_chatbot, content_subject, content_grade, questions_answers_json, ai_chatbot_socratic_mode_btn, ai_chatbot_thread_id, ai_name]
outputs_list = [ai_msg, ai_chatbot, ai_send_button, ai_send_feedback_btn, ai_chatbot_thread_id]
setup_question_button_click(question_btn, inputs_list, outputs_list, chat_with_any_ai)
# ็‚บ็”Ÿๆˆๅ•้กŒๆŒ‰้ˆ•่จญๅฎš็‰นๆฎŠ็š„้ปžๆ“Šไบ‹ไปถ
question_buttons = [
ai_chatbot_question_1,
ai_chatbot_question_2,
ai_chatbot_question_3
]
create_questions_btn.click(
change_questions,
inputs=[password, df_string_output],
outputs=question_buttons
)
ai_chatbot_audio_input.change(
process_open_ai_audio_to_chatbot,
inputs=[password, ai_chatbot_audio_input],
outputs=[ai_msg]
)
# ๅฝ“่พ“ๅ…ฅ YouTube ้“พๆŽฅๆ—ถ่งฆๅ‘
process_youtube_link_inputs = [password, youtube_link]
process_youtube_link_outputs = [
video_id,
questions_answers_json,
df_string_output,
summary_text,
df_summarise,
key_moments,
key_moments_html,
mind_map,
mind_map_html,
transcript_html,
simple_html_content,
slide_image,
slide_text,
reading_passage_text,
reading_passage,
content_subject,
content_grade,
]
update_state_inputs = [
content_subject,
content_grade,
df_string_output,
key_moments,
questions_answers_json,
]
update_state_outputs = [
content_subject_state,
content_grade_state,
trascript_state,
key_moments_state,
streaming_chat_thread_id_state,
ai_chatbot_question_1,
ai_chatbot_question_2,
ai_chatbot_question_3
]
youtube_link.change(
process_youtube_link,
inputs=process_youtube_link_inputs,
outputs=process_youtube_link_outputs
).then(
update_state,
inputs=update_state_inputs,
outputs=update_state_outputs
)
youtube_link_btn.click(
process_youtube_link,
inputs=process_youtube_link_inputs,
outputs=process_youtube_link_outputs
).then(
update_state,
inputs=update_state_inputs,
outputs=update_state_outputs
)
# --- CRUD admin ---
def setup_content_buttons(buttons_config):
for config in buttons_config:
button = config['button']
action = config['action']
inputs = config['inputs']
outputs = config['outputs']
button.click(
fn=action,
inputs=inputs,
outputs=outputs
)
content_buttons_config = [
# Transcript actions
{
'button': transcript_get_button,
'action': get_LLM_content,
'inputs': [video_id, transcript_kind],
'outputs': [df_string_output]
},
{
'button': transcript_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, transcript_kind],
'outputs': [df_string_output]
},
{
'button': transcript_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, transcript_kind],
'outputs': [df_string_output]
},
{
'button': transcript_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [df_string_output]
},
{
'button': transcript_update_button,
'action': update_LLM_content,
'inputs': [video_id, df_string_output, transcript_kind],
'outputs': [df_string_output]
},
# Reading passage actions
{
'button': reading_passage_get_button,
'action': get_LLM_content,
'inputs': [video_id, reading_passage_kind],
'outputs': [reading_passage_text]
},
{
'button': reading_passage_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, reading_passage_kind],
'outputs': [reading_passage_text]
},
{
'button': reading_passage_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, reading_passage_kind],
'outputs': [reading_passage_text]
},
{
'button': reading_passage_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [reading_passage_text]
},
{
'button': reading_passage_update_button,
'action': update_LLM_content,
'inputs': [video_id, reading_passage_text, reading_passage_kind],
'outputs': [reading_passage_text]
},
# Summary actions
{
'button': summary_get_button,
'action': get_LLM_content,
'inputs': [video_id, summary_kind],
'outputs': [summary_text]
},
{
'button': summary_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, summary_kind],
'outputs': [summary_text]
},
{
'button': summary_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, summary_kind],
'outputs': [summary_text]
},
{
'button': summary_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [summary_text]
},
{
'button': summary_update_button,
'action': update_LLM_content,
'inputs': [video_id, summary_text, summary_kind],
'outputs': [summary_text]
},
# Key moments actions
{
'button': key_moments_get_button,
'action': get_LLM_content,
'inputs': [video_id, key_moments_kind],
'outputs': [key_moments]
},
{
'button': key_moments_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, key_moments_kind],
'outputs': [key_moments]
},
{
'button': key_moments_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, key_moments_kind],
'outputs': [key_moments]
},
{
'button': key_moments_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [key_moments]
},
{
'button': key_moments_update_button,
'action': update_LLM_content,
'inputs': [video_id, key_moments, key_moments_kind],
'outputs': [key_moments]
},
# Questions actions
{
'button': questions_get_button,
'action': get_LLM_content,
'inputs': [video_id, questions_kind],
'outputs': [questions_json]
},
{
'button': questions_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, questions_kind],
'outputs': [questions_json]
},
{
'button': questions_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, questions_kind],
'outputs': [questions_json]
},
{
'button': questions_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [questions_json]
},
{
'button': questions_update_button,
'action': update_LLM_content,
'inputs': [video_id, questions_json, questions_kind],
'outputs': [questions_json]
},
# Questions answers actions
{
'button': questions_answers_get_button,
'action': get_LLM_content,
'inputs': [video_id, questions_answers_kind],
'outputs': [questions_answers_json]
},
{
'button': questions_answers_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, questions_answers_kind],
'outputs': [questions_answers_json]
},
{
'button': questions_answers_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, questions_answers_kind],
'outputs': [questions_answers_json]
},
{
'button': questions_answers_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [questions_answers_json]
},
{
'button': questions_answers_update_button,
'action': update_LLM_content,
'inputs': [video_id, questions_answers_json, questions_answers_kind],
'outputs': [questions_answers_json]
},
# Worksheet actions
{
'button': worksheet_get_button,
'action': get_LLM_content,
'inputs': [video_id, worksheet_kind],
'outputs': [worksheet_json]
},
{
'button': worksheet_create_button,
'action': create_LLM_content,
'inputs': [video_id, df_string_output, worksheet_kind],
'outputs': [worksheet_json]
},
{
'button': worksheet_delete_button,
'action': delete_LLM_content,
'inputs': [video_id, worksheet_kind],
'outputs': [worksheet_json]
},
{
'button': worksheet_edit_button,
'action': enable_edit_mode,
'inputs': [],
'outputs': [worksheet_json]
},
{
'button': worksheet_update_button,
'action': update_LLM_content,
'inputs': [video_id, worksheet_json, worksheet_kind],
'outputs': [worksheet_json]
},
]
setup_content_buttons(content_buttons_config)
# --- Education Material ---
def setup_education_buttons(buttons_config):
for config in buttons_config:
button = config["button"]
action = config["action"]
inputs = config["inputs"]
outputs = config["outputs"]
button.click(
fn=action,
inputs=inputs,
outputs=outputs
)
education_buttons_config = [
# ๅญธ็ฟ’ๅ–ฎ็›ธ้—œๆŒ‰้ˆ•
{
"button": worksheet_content_btn,
"action": get_ai_content,
"inputs": [password, video_id, df_string_output, content_subject, content_grade, content_level, worksheet_algorithm, worksheet_content_type_name],
"outputs": [worksheet_exam_result_original, worksheet_exam_result, worksheet_prompt, worksheet_exam_result_prompt]
},
{
"button": worksheet_exam_result_fine_tune_btn,
"action": generate_exam_fine_tune_result,
"inputs": [password, worksheet_exam_result_prompt, df_string_output, worksheet_exam_result, worksheet_exam_result_fine_tune_prompt],
"outputs": [worksheet_exam_result]
},
{
"button": worksheet_download_exam_result_button,
"action": download_exam_result,
"inputs": [worksheet_exam_result],
"outputs": [worksheet_exam_result_word_link]
},
{
"button": worksheet_exam_result_retrun_original,
"action": return_original_exam_result,
"inputs": [worksheet_exam_result_original],
"outputs": [worksheet_exam_result]
},
# ๆ•™ๆกˆ็›ธ้—œๆŒ‰้ˆ•
{
"button": lesson_plan_btn,
"action": get_ai_content,
"inputs": [password, video_id, df_string_output, content_subject, content_grade, content_level, lesson_plan_time, lesson_plan_content_type_name],
"outputs": [lesson_plan_exam_result_original, lesson_plan_exam_result, lesson_plan_prompt, lesson_plan_exam_result_prompt]
},
{
"button": lesson_plan_exam_result_fine_tune_btn,
"action": generate_exam_fine_tune_result,
"inputs": [password, lesson_plan_exam_result_prompt, df_string_output, lesson_plan_exam_result, lesson_plan_exam_result_fine_tune_prompt],
"outputs": [lesson_plan_exam_result]
},
{
"button": lesson_plan_download_exam_result_button,
"action": download_exam_result,
"inputs": [lesson_plan_exam_result],
"outputs": [lesson_plan_exam_result_word_link]
},
{
"button": lesson_plan_exam_result_retrun_original,
"action": return_original_exam_result,
"inputs": [lesson_plan_exam_result_original],
"outputs": [lesson_plan_exam_result]
},
# ๅ‡บๅ ดๅˆธ็›ธ้—œๆŒ‰้ˆ•
{
"button": exit_ticket_btn,
"action": get_ai_content,
"inputs": [password, video_id, df_string_output, content_subject, content_grade, content_level, exit_ticket_time, exit_ticket_content_type_name],
"outputs": [exit_ticket_exam_result_original, exit_ticket_exam_result, exit_ticket_prompt, exit_ticket_exam_result_prompt]
},
{
"button": exit_ticket_exam_result_fine_tune_btn,
"action": generate_exam_fine_tune_result,
"inputs": [password, exit_ticket_exam_result_prompt, df_string_output, exit_ticket_exam_result, exit_ticket_exam_result_fine_tune_prompt],
"outputs": [exit_ticket_exam_result]
},
{
"button": exit_ticket_download_exam_result_button,
"action": download_exam_result,
"inputs": [exit_ticket_exam_result],
"outputs": [exit_ticket_exam_result_word_link]
},
{
"button": exit_ticket_exam_result_retrun_original,
"action": return_original_exam_result,
"inputs": [exit_ticket_exam_result_original],
"outputs": [exit_ticket_exam_result]
}
]
setup_education_buttons(education_buttons_config)
# init_params
init_outputs = [
admin,
reading_passage_admin,
summary_admmin,
see_details,
worksheet_accordion,
lesson_plan_accordion,
exit_ticket_accordion,
password,
youtube_link,
chatbot_open_ai_streaming,
chatbot_ai,
ai_chatbot_params,
]
demo.load(
init_params,
inputs =[youtube_link],
outputs = init_outputs
)
demo.launch(allowed_paths=["videos"])