|
import gradio as gr |
|
import requests |
|
import re |
|
import os |
|
import json |
|
import time |
|
import threading |
|
from googleapiclient.discovery import build |
|
from huggingface_hub import InferenceClient |
|
from pytube import YouTube |
|
import whisper |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
model = whisper.load_model("base") |
|
|
|
|
|
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY' |
|
|
|
|
|
youtube = build('youtube', 'v3', developerKey=API_KEY) |
|
|
|
|
|
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN")) |
|
|
|
WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc" |
|
COMMENTS_FILE = 'comments.json' |
|
|
|
DEFAULT_SYSTEM_PROMPT = "λνμ λ°λμ λμ μ΄λ¦ 'GPTube'λ₯Ό λ°νλ©° νκΈλ‘ μΈμ¬λ₯ΌνλΌ. λ°λμ 'νκΈ'(νκ΅μ΄)λ‘ 250 ν ν° μ΄λ΄λ‘ λ΅λ³μ μμ±νκ³ μΆλ ₯νλΌ. Respond to the following YouTube comment in a friendly and helpful manner:" |
|
|
|
stop_event = threading.Event() |
|
|
|
def load_existing_comments(): |
|
if os.path.exists(COMMENTS_FILE): |
|
with open(COMMENTS_FILE, 'r') as file: |
|
return json.load(file) |
|
return [] |
|
|
|
def save_comments(comments): |
|
with open(COMMENTS_FILE, 'w') as file: |
|
json.dump(comments, file) |
|
|
|
def download_audio(video_url): |
|
yt = YouTube(video_url) |
|
audio = yt.streams.filter(only_audio=True).first() |
|
audio_path = audio.download(output_path=".") |
|
|
|
file_stats = os.stat(audio_path) |
|
logging.info(f'Size of audio file in Bytes: {file_stats.st_size}') |
|
|
|
if file_stats.st_size <= 30000000: |
|
base, ext = os.path.splitext(audio_path) |
|
new_file = base + '.mp3' |
|
os.rename(audio_path, new_file) |
|
return new_file |
|
else: |
|
logging.error('Videos for transcription on this space are limited to about 1.5 hours. Please contact support for more information.') |
|
return None |
|
|
|
def generate_transcript(audio_path): |
|
try: |
|
if not audio_path or not os.path.exists(audio_path): |
|
raise ValueError("μ ν¨ν μ€λμ€ νμΌ κ²½λ‘κ° μλλλ€.") |
|
|
|
result = model.transcribe(audio_path) |
|
return result['text'].strip() |
|
except Exception as e: |
|
logging.error(f"Exception during transcription: {str(e)}") |
|
return f"μ μ¬ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" |
|
|
|
def generate_reply(comment_text, system_prompt): |
|
prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:" |
|
response = client.text_generation( |
|
prompt=prompt, |
|
max_new_tokens=250, |
|
temperature=0.7, |
|
top_p=0.9 |
|
) |
|
if isinstance(response, dict) and 'generated_text' in response: |
|
return response['generated_text'] |
|
return response |
|
|
|
def send_webhook(data): |
|
response = requests.post(WEBHOOK_URL, json=data) |
|
return response.status_code, response.text |
|
|
|
def get_video_comments(video_id): |
|
try: |
|
comments = [] |
|
request = youtube.commentThreads().list( |
|
part='snippet', |
|
videoId=video_id, |
|
maxResults=100, |
|
textFormat='plainText' |
|
) |
|
response = request.execute() |
|
while request is not None: |
|
for item in response['items']: |
|
snippet = item['snippet']['topLevelComment']['snippet'] |
|
comment = { |
|
'comment_id': item['snippet']['topLevelComment']['id'], |
|
'author': snippet['authorDisplayName'], |
|
'published_at': snippet['publishedAt'], |
|
'text': snippet['textDisplay'], |
|
'reply_count': item['snippet']['totalReplyCount'] |
|
} |
|
comments.append(comment) |
|
if 'nextPageToken' in response: |
|
request = youtube.commentThreads().list( |
|
part='snippet', |
|
videoId=video_id, |
|
pageToken=response['nextPageToken'], |
|
maxResults=100, |
|
textFormat='plainText' |
|
) |
|
response = request.execute() |
|
else: |
|
break |
|
return comments |
|
except Exception as e: |
|
return [{'error': str(e)}] |
|
|
|
def fetch_comments(video_url, system_prompt): |
|
log_entries = [] |
|
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url) |
|
if video_id_match: |
|
video_id = video_id_match.group(1) |
|
audio_path = download_audio(video_url) |
|
if not audio_path: |
|
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€." |
|
|
|
transcript = generate_transcript(audio_path) |
|
|
|
existing_comments = load_existing_comments() |
|
new_comments = get_video_comments(video_id) |
|
|
|
if not new_comments or 'error' in new_comments[0]: |
|
return "λκΈμ μ°Ύμ μ μκ±°λ μ€λ₯κ° λ°μνμ΅λλ€." |
|
|
|
recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0] |
|
|
|
if recent_new_comments: |
|
for most_recent_comment in recent_new_comments: |
|
combined_prompt = f"{transcript}\n\n{system_prompt}" |
|
reply_text = generate_reply(most_recent_comment['text'], combined_prompt) |
|
webhook_data = { |
|
"comment_id": most_recent_comment['comment_id'], |
|
"author": most_recent_comment['author'], |
|
"published_at": most_recent_comment['published_at'], |
|
"text": most_recent_comment['text'], |
|
"reply_text": reply_text |
|
} |
|
webhook_status, webhook_response = send_webhook(webhook_data) |
|
log_entries.append(f"μ΅κ·Ό λκΈ: {most_recent_comment['text']}\n\nλ΅λ³ μμ±: {reply_text}\n\nμΉν
μλ΅: {webhook_status} - {webhook_response}") |
|
existing_comments.append(most_recent_comment) |
|
save_comments(existing_comments) |
|
else: |
|
log_entries.append("μλ‘μ΄ λκΈμ΄ μμ΅λλ€.") |
|
else: |
|
log_entries.append("μ ν¨νμ§ μμ YouTube URLμ
λλ€.") |
|
return "\n\n".join(log_entries) |
|
|
|
def background_fetch_comments(): |
|
while not stop_event.is_set(): |
|
result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) |
|
print(result) |
|
time.sleep(10) |
|
|
|
def start_background_fetch(): |
|
threading.Thread(target=background_fetch_comments).start() |
|
|
|
def stop_background_fetch(): |
|
stop_event.set() |
|
|
|
def get_text(video_url): |
|
audio_path = download_audio(video_url) |
|
if not audio_path: |
|
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€." |
|
|
|
transcript = generate_transcript(audio_path) |
|
return transcript |
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
gr.Markdown("<h1><center>GPTube</center></h1>") |
|
|
|
with gr.Row(): |
|
input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL') |
|
input_text_prompt = gr.Textbox(placeholder='μμ€ν
ν둬ννΈ', label='μμ€ν
ν둬ννΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5) |
|
|
|
with gr.Row(): |
|
result_button_transcribe = gr.Button('Transcribe') |
|
result_button_comments = gr.Button('Fetch Comments and Generate Reply') |
|
|
|
with gr.Row(): |
|
output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20) |
|
output_text_prompt = gr.Textbox(placeholder='μλ΅ ν
μ€νΈ', label='μλ΅ ν
μ€νΈ', lines=20) |
|
|
|
result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api") |
|
result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api") |
|
|
|
|
|
demo.launch() |
|
|