import discord import logging import os import re import asyncio import subprocess import aiohttp from huggingface_hub import InferenceClient from googleapiclient.discovery import build from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import TextFormatter from dotenv import load_dotenv # 환경 변수 로드 load_dotenv() # 로깅 설정 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()]) # 인텐트 설정 intents = discord.Intents.default() intents.message_content = True intents.messages = True intents.guilds = True intents.guild_messages = True # 추론 API 클라이언트 설정 hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) # YouTube API 설정 API_KEY = os.getenv("YOUTUBE_API_KEY") youtube_service = build('youtube', 'v3', developerKey=API_KEY) # 특정 채널 ID SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) # 웹훅 URL 설정 WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTY1MDYzMjA0MzA1MjY4NTUzMDUxMzUi_pc" # 전송 실패 시 재시도 횟수 MAX_RETRIES = 3 class MyClient(discord.Client): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_processing = False self.session = None async def on_ready(self): logging.info(f'{self.user}로 로그인되었습니다!') # web.py 파일 실행 subprocess.Popen(["python", "web.py"]) logging.info("Web.py 서버가 시작되었습니다.") # aiohttp 클라이언트 세션 생성 self.session = aiohttp.ClientSession() # 봇이 시작될 때 안내 메시지를 전송 channel = self.get_channel(SPECIFIC_CHANNEL_ID) if channel: await channel.send("유튜브 비디오 URL을 입력하면, 자막과 댓글을 기반으로 답글을 작성합니다.") async def on_message(self, message): if message.author == self.user: return if not self.is_message_in_specific_channel(message): return if self.is_processing: return self.is_processing = True try: video_id = extract_video_id(message.content) if video_id: transcript = await get_best_available_transcript(video_id) comments = await get_video_comments(video_id) if comments and transcript: replies = await generate_replies(comments, transcript) await create_thread_and_send_replies(message, video_id, comments, replies, self.session) else: await message.channel.send("자막이나 댓글을 가져올 수 없습니다.") else: await message.channel.send("유효한 유튜브 비디오 URL을 제공해 주세요.") finally: self.is_processing = False def is_message_in_specific_channel(self, message): return message.channel.id == SPECIFIC_CHANNEL_ID or ( isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID ) async def close(self): # aiohttp 클라이언트 세션 종료 if self.session: await self.session.close() await super().close() def extract_video_id(url): video_id = None youtube_regex = ( r'(https?://)?(www\.)?' '(youtube|youtu|youtube-nocookie)\.(com|be)/' '(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') match = re.match(youtube_regex, url) if match: video_id = match.group(6) logging.debug(f'추출된 비디오 ID: {video_id}') return video_id async def get_best_available_transcript(video_id): try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ko']) except Exception as e: logging.warning(f'한국어 자막 가져오기 오류: {e}') try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) except Exception as e: logging.warning(f'영어 자막 가져오기 오류: {e}') try: transcripts = YouTubeTranscriptApi.list_transcripts(video_id) transcript = transcripts.find_manually_created_transcript().fetch() except Exception as e: logging.error(f'대체 자막 가져오기 오류: {e}') return None formatter = TextFormatter() transcript_text = formatter.format_transcript(transcript) logging.debug(f'가져온 자막: {transcript_text}') return transcript_text async def get_video_comments(video_id): comments = [] response = youtube_service.commentThreads().list( part='snippet', videoId=video_id, maxResults=100 # 최대 100개의 댓글 가져오기 ).execute() for item in response.get('items', []): comment = item['snippet']['topLevelComment']['snippet']['textOriginal'] comment_id = item['snippet']['topLevelComment']['id'] comments.append((comment, comment_id)) logging.debug(f'가져온 댓글: {comments}') return comments async def generate_replies(comments, transcript): replies = [] for comment, _ in comments: messages = [ {"role": "system", "content": """너의 이름은 OpenFreeAI이다. 답글 생성후 가장 마지막에 너의 이름을 밝히고 공손하게 인사하라. 비디오 자막: {transcript}"""}, {"role": "user", "content": comment} ] loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, lambda: hf_client.chat_completion( messages, max_tokens=250, temperature=0.7, top_p=0.85)) if response.choices and response.choices[0].message: reply = response.choices[0].message['content'].strip() else: reply = "답글을 생성할 수 없습니다." replies.append(reply) logging.debug(f'생성된 답글: {replies}') return replies async def send_webhook_data(session, chunk_data, chunk_number): for attempt in range(MAX_RETRIES): try: async with session.post(WEBHOOK_URL, json=chunk_data) as resp: if resp.status == 200: logging.info(f"웹훅으로 데이터 전송 성공: {chunk_number} 번째 시도") return True # 성공 시 종료 else: logging.error(f"웹훅으로 데이터 전송 실패: {resp.status}, {chunk_number} 번째 시도") except aiohttp.ClientError as e: logging.error(f"웹훅 전송 중 오류 발생: {e}, {chunk_number} 번째 시도") await asyncio.sleep(1) # 재시도 전에 잠시 대기 return False # 재시도 횟수 초과 시 실패로 간주 async def create_thread_and_send_replies(message, video_id, comments, replies, session): thread = await message.channel.create_thread(name=f"{message.author.name}의 댓글 답글", message=message) webhook_data = {"video_id": video_id, "replies": []} for (comment, comment_id), reply in zip(comments, replies): embed = discord.Embed(description=f"**댓글**: {comment}\n**답글**: {reply}") await thread.send(embed=embed) # 웹훅 데이터 준비 (comment id 포함) webhook_data["replies"].append({"comment": comment, "reply": reply, "comment_id": comment_id}) # 데이터를 여러 번 나누어 전송 chunk_size = 1 # 전송할 데이터의 개수를 1로 설정하여 각 데이터를 별도로 전송 for i in range(0, len(webhook_data["replies"]), chunk_size): chunk = webhook_data["replies"][i:i+chunk_size] chunk_data = {"video_id": video_id, "replies": chunk} success = await send_webhook_data(session, chunk_data, i // chunk_size + 1) if not success: logging.error(f"데이터 전송 실패: {i // chunk_size + 1} 번째 청크") if __name__ == "__main__": discord_client = MyClient(intents=intents) discord_client.run(os.getenv('DISCORD_TOKEN'))