tube / app-backup.py
ginipick's picture
Rename app (2).py to app-backup.py
6170788 verified
raw
history blame
8.21 kB
import gradio as gr
import requests
import re
import os
import json
import time
import threading
from googleapiclient.discovery import build
from huggingface_hub import InferenceClient
from pytube import YouTube
import whisper
import logging
# 둜그 μ„€μ •
logging.basicConfig(level=logging.INFO)
# Whisper λͺ¨λΈ λ‘œλ“œ
model = whisper.load_model("base")
# YouTube API ν‚€
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY'
# YouTube API μ„œλΉ„μŠ€ λΉŒλ“œ
youtube = build('youtube', 'v3', developerKey=API_KEY)
# Hugging Face API μ„€μ •
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN"))
WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc"
COMMENTS_FILE = 'comments.json'
DEFAULT_SYSTEM_PROMPT = "λŒ€ν™”μ‹œ λ°˜λ“œμ‹œ λ‚˜μ˜ 이름 'GPTube'λ₯Ό 밝히며 ν•œκΈ€λ‘œ 인사λ₯Όν•˜λΌ. λ°˜λ“œμ‹œ 'ν•œκΈ€'(ν•œκ΅­μ–΄)둜 250 토큰 μ΄λ‚΄λ‘œ 닡변을 μƒμ„±ν•˜κ³  좜λ ₯ν•˜λΌ. Respond to the following YouTube comment in a friendly and helpful manner:"
stop_event = threading.Event() # μŠ€λ ˆλ“œ 쀑지λ₯Ό μœ„ν•œ 이벀트
def load_existing_comments():
if os.path.exists(COMMENTS_FILE):
with open(COMMENTS_FILE, 'r') as file:
return json.load(file)
return []
def save_comments(comments):
with open(COMMENTS_FILE, 'w') as file:
json.dump(comments, file)
def download_audio(video_url):
yt = YouTube(video_url)
audio = yt.streams.filter(only_audio=True).first()
audio_path = audio.download(output_path=".")
file_stats = os.stat(audio_path)
logging.info(f'Size of audio file in Bytes: {file_stats.st_size}')
if file_stats.st_size <= 30000000: # Check the file size limit
base, ext = os.path.splitext(audio_path)
new_file = base + '.mp3'
os.rename(audio_path, new_file)
return new_file
else:
logging.error('Videos for transcription on this space are limited to about 1.5 hours. Please contact support for more information.')
return None
def generate_transcript(audio_path):
try:
if not audio_path or not os.path.exists(audio_path):
raise ValueError("μœ νš¨ν•œ μ˜€λ””μ˜€ 파일 κ²½λ‘œκ°€ μ•„λ‹™λ‹ˆλ‹€.")
result = model.transcribe(audio_path)
return result['text'].strip()
except Exception as e:
logging.error(f"Exception during transcription: {str(e)}")
return f"전사 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"
def generate_reply(comment_text, system_prompt):
prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:"
response = client.text_generation(
prompt=prompt,
max_new_tokens=250,
temperature=0.7,
top_p=0.9
)
if isinstance(response, dict) and 'generated_text' in response:
return response['generated_text']
return response
def send_webhook(data):
response = requests.post(WEBHOOK_URL, json=data)
return response.status_code, response.text
def get_video_comments(video_id):
try:
comments = []
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=100, #λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
textFormat='plainText'
)
response = request.execute()
while request is not None:
for item in response['items']:
snippet = item['snippet']['topLevelComment']['snippet']
comment = {
'comment_id': item['snippet']['topLevelComment']['id'],
'author': snippet['authorDisplayName'],
'published_at': snippet['publishedAt'],
'text': snippet['textDisplay'],
'reply_count': item['snippet']['totalReplyCount']
}
comments.append(comment)
if 'nextPageToken' in response:
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
pageToken=response['nextPageToken'],
maxResults=100, #λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
textFormat='plainText'
)
response = request.execute()
else:
break
return comments
except Exception as e:
return [{'error': str(e)}]
def fetch_comments(video_url, system_prompt):
log_entries = []
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
if video_id_match:
video_id = video_id_match.group(1)
audio_path = download_audio(video_url)
if not audio_path:
return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
transcript = generate_transcript(audio_path)
existing_comments = load_existing_comments()
new_comments = get_video_comments(video_id)
if not new_comments or 'error' in new_comments[0]:
return "λŒ“κΈ€μ„ 찾을 수 μ—†κ±°λ‚˜ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€."
recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0]
if recent_new_comments:
for most_recent_comment in recent_new_comments:
combined_prompt = f"{transcript}\n\n{system_prompt}"
reply_text = generate_reply(most_recent_comment['text'], combined_prompt)
webhook_data = {
"comment_id": most_recent_comment['comment_id'],
"author": most_recent_comment['author'],
"published_at": most_recent_comment['published_at'],
"text": most_recent_comment['text'],
"reply_text": reply_text
}
webhook_status, webhook_response = send_webhook(webhook_data)
log_entries.append(f"졜근 λŒ“κΈ€: {most_recent_comment['text']}\n\nλ‹΅λ³€ 생성: {reply_text}\n\nμ›Ήν›… 응닡: {webhook_status} - {webhook_response}")
existing_comments.append(most_recent_comment)
save_comments(existing_comments)
else:
log_entries.append("μƒˆλ‘œμš΄ λŒ“κΈ€μ΄ μ—†μŠ΅λ‹ˆλ‹€.")
else:
log_entries.append("μœ νš¨ν•˜μ§€ μ•Šμ€ YouTube URLμž…λ‹ˆλ‹€.")
return "\n\n".join(log_entries)
def background_fetch_comments():
while not stop_event.is_set():
result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) # URLκ³Ό ν”„λ‘¬ν”„νŠΈ μ‹€μ œ μ‚¬μš© μ˜ˆμ‹œ
print(result)
time.sleep(10)
def start_background_fetch():
threading.Thread(target=background_fetch_comments).start()
def stop_background_fetch():
stop_event.set()
def get_text(video_url):
audio_path = download_audio(video_url)
if not audio_path:
return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
transcript = generate_transcript(audio_path)
return transcript
# Gradio μΈν„°νŽ˜μ΄μŠ€ μ •μ˜
demo = gr.Blocks()
with demo:
gr.Markdown("<h1><center>GPTube</center></h1>")
with gr.Row():
input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL')
input_text_prompt = gr.Textbox(placeholder='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', label='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5)
with gr.Row():
result_button_transcribe = gr.Button('Transcribe')
result_button_comments = gr.Button('Fetch Comments and Generate Reply')
with gr.Row():
output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20)
output_text_prompt = gr.Textbox(placeholder='응닡 ν…μŠ€νŠΈ', label='응닡 ν…μŠ€νŠΈ', lines=20)
result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api")
result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api")
# μΈν„°νŽ˜μ΄μŠ€ μ‹€ν–‰
demo.launch()