# gradio final ver ---------------------------- import torch from torch import nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import Dataset, DataLoader import gluonnlp as nlp import numpy as np from tqdm import tqdm, tqdm_notebook import pandas as pd import ast import os # Hugging Face를 통한 모델 및 토크나이저 Import from kobert_tokenizer import KoBERTTokenizer from transformers import BertModel from transformers import AdamW from transformers.optimization import get_cosine_schedule_with_warmup n_devices = torch.cuda.device_count() print(n_devices) for i in range(n_devices): print(torch.cuda.get_device_name(i)) if torch.cuda.is_available(): device = torch.device("cuda") print('There are %d GPU(s) available.' % torch.cuda.device_count()) print('We will use the GPU:', torch.cuda.get_device_name(0)) else: device = torch.device("cpu") print('No GPU available, using the CPU instead.') max_len = 64 batch_size = 32 warmup_ratio = 0.1 num_epochs = 5 max_grad_norm = 1 log_interval = 200 learning_rate = 1e-5 class BERTSentenceTransform: r"""BERT style data transformation. Parameters ---------- tokenizer : BERTTokenizer. Tokenizer for the sentences. max_seq_length : int. Maximum sequence length of the sentences. pad : bool, default True Whether to pad the sentences to maximum length. pair : bool, default True Whether to transform sentences or sentence pairs. """ # 입력으로 받은 tokenizerm 최대 시퀀스 길이, vocab, pad 및 pair 설정 def __init__(self, tokenizer, max_seq_length,vocab, pad=True, pair=True): self._tokenizer = tokenizer self._max_seq_length = max_seq_length self._pad = pad self._pair = pair self._vocab = vocab # 입력된 문장 또는 문장 쌍을 BERT 모델이 사용할 수 있는 형식으로 변환 def __call__(self, line): """Perform transformation for sequence pairs or single sequences. The transformation is processed in the following steps: - tokenize the input sequences - insert [CLS], [SEP] as necessary - generate type ids to indicate whether a token belongs to the first sequence or the second sequence. - generate valid length For sequence pairs, the input is a tuple of 2 strings: text_a, text_b. Inputs: text_a: 'is this jacksonville ?' text_b: 'no it is not' Tokenization: text_a: 'is this jack ##son ##ville ?' text_b: 'no it is not .' Processed: tokens: '[CLS] is this jack ##son ##ville ? [SEP] no it is not . [SEP]' type_ids: 0 0 0 0 0 0 0 0 1 1 1 1 1 1 valid_length: 14 For single sequences, the input is a tuple of single string: text_a. Inputs: text_a: 'the dog is hairy .' Tokenization: text_a: 'the dog is hairy .' Processed: text_a: '[CLS] the dog is hairy . [SEP]' type_ids: 0 0 0 0 0 0 0 valid_length: 7 Parameters ---------- line: tuple of str Input strings. For sequence pairs, the input is a tuple of 2 strings: (text_a, text_b). For single sequences, the input is a tuple of single string: (text_a,). Returns ------- np.array: input token ids in 'int32', shape (batch_size, seq_length) np.array: valid length in 'int32', shape (batch_size,) np.array: input token type ids in 'int32', shape (batch_size, seq_length) """ # convert to unicode text_a = line[0] if self._pair: assert len(line) == 2 text_b = line[1] tokens_a = self._tokenizer.tokenize(text_a) tokens_b = None if self._pair: tokens_b = self._tokenizer(text_b) if tokens_b: # Modifies `tokens_a` and `tokens_b` in place so that the total # length is less than the specified length. # Account for [CLS], [SEP], [SEP] with "- 3" self._truncate_seq_pair(tokens_a, tokens_b, self._max_seq_length - 3) else: # Account for [CLS] and [SEP] with "- 2" if len(tokens_a) > self._max_seq_length - 2: tokens_a = tokens_a[0:(self._max_seq_length - 2)] # The embedding vectors for `type=0` and `type=1` were learned during # pre-training and are added to the wordpiece embedding vector # (and position vector). This is not *strictly* necessary since # the [SEP] token unambiguously separates the sequences, but it makes # it easier for the model to learn the concept of sequences. # For classification tasks, the first vector (corresponding to [CLS]) is # used as as the "sentence vector". Note that this only makes sense because # the entire model is fine-tuned. #vocab = self._tokenizer.vocab vocab = self._vocab tokens = [] tokens.append(vocab.cls_token) tokens.extend(tokens_a) tokens.append(vocab.sep_token) segment_ids = [0] * len(tokens) if tokens_b: tokens.extend(tokens_b) tokens.append(vocab.sep_token) segment_ids.extend([1] * (len(tokens) - len(segment_ids))) input_ids = self._tokenizer.convert_tokens_to_ids(tokens) # The valid length of sentences. Only real tokens are attended to. valid_length = len(input_ids) if self._pad: # Zero-pad up to the sequence length. padding_length = self._max_seq_length - valid_length # use padding tokens for the rest input_ids.extend([vocab[vocab.padding_token]] * padding_length) segment_ids.extend([0] * padding_length) return np.array(input_ids, dtype='int32'), np.array(valid_length, dtype='int32'),\ np.array(segment_ids, dtype='int32') class BERTDataset(Dataset): def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair): transform = BERTSentenceTransform(bert_tokenizer, max_seq_length=max_len,vocab=vocab, pad=pad, pair=pair) #transform = nlp.data.BERTSentenceTransform( # tokenizer, max_seq_length=max_len, pad=pad, pair=pair) self.sentences = [transform([i[sent_idx]]) for i in dataset] self.labels = [np.int32(i[label_idx]) for i in dataset] def __getitem__(self, i): return (self.sentences[i] + (self.labels[i], )) def __len__(self): return (len(self.labels)) tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1') bertmodel = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False) vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]') # Kobert_softmax class BERTClassifier(nn.Module): def __init__(self, bert, hidden_size=768, num_classes=6, dr_rate=None, params=None): super(BERTClassifier, self).__init__() self.bert = bert self.dr_rate = dr_rate self.softmax = nn.Softmax(dim=1) # Softmax로 변경 self.classifier = nn.Sequential( nn.Dropout(p=0.5), nn.Linear(in_features=hidden_size, out_features=512), nn.Linear(in_features=512, out_features=num_classes), ) # 정규화 레이어 추가 (Layer Normalization) self.layer_norm = nn.LayerNorm(768) # 드롭아웃 self.dropout = nn.Dropout(p=dr_rate) def gen_attention_mask(self, token_ids, valid_length): attention_mask = torch.zeros_like(token_ids) for i, v in enumerate(valid_length): attention_mask[i][:v] = 1 return attention_mask.float() def forward(self, token_ids, valid_length, segment_ids): attention_mask = self.gen_attention_mask(token_ids, valid_length) _, pooler = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device)) pooled_output = self.dropout(pooler) normalized_output = self.layer_norm(pooled_output) out = self.classifier(normalized_output) # LayerNorm 적용 pooler = self.layer_norm(pooler) if self.dr_rate: pooler = self.dropout(pooler) logits = self.classifier(pooler) # 분류를 위한 로짓 값 계산 probabilities = self.softmax(logits) # Softmax로 각 클래스의 확률 계산 return probabilities # 각 클래스에 대한 확률 반환 model = torch.load('./model_weights_softmax(model).pth', map_location=torch.device('cpu')) model.eval() # 멜론 데이터 불러오기 melon_data = pd.read_csv('./melon_data.csv') melon_emotions = pd.read_csv('./melon_emotions_final.csv') melon_emotions = pd.merge(melon_emotions, melon_data, left_on='Title', right_on='title', how='inner') melon_emotions = melon_emotions[['singer', 'Title', 'genre','Emotions']] melon_emotions = melon_emotions.drop_duplicates(subset='Title', keep='first') melon_emotions['Emotions'] = melon_emotions['Emotions'].apply(lambda x: ast.literal_eval(x)) emotions = melon_emotions['Emotions'].to_list() #gradio import numpy as np import pandas as pd import requests from PIL import Image import torch from transformers import AutoProcessor, AutoModelForZeroShotImageClassification, pipeline import gradio as gr import openai from sklearn.metrics.pairwise import cosine_similarity import ast ###### 기본 설정 ###### # OpenAI API 키 설정 api_key = os.getenv("OPEN_AI_KEY") openai.api_key = api_key if openai.api_key: print("Private Key:", openai.api_key) else: print("Private Key not set.") # 모델 및 프로세서 로드 processor = AutoProcessor.from_pretrained("openai/clip-vit-large-patch14") model_clip = AutoModelForZeroShotImageClassification.from_pretrained("openai/clip-vit-large-patch14") tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1') # 예측 레이블 labels = ['a photo of a happy face', 'a photo of a joyful face', 'a photo of a loving face', 'a photo of an angry face', 'a photo of a melancholic face', 'a photo of a lonely face'] ###### 얼굴 감정 벡터 예측 함수 ###### def predict_face_emotion(image): # 이미지가 None이거나 잘못된 경우 if image is None: return np.zeros(len(labels)) # 빈 벡터 반환 # PIL 이미지를 RGB로 변환 image = image.convert("RGB") # CLIP 모델의 processor를 이용한 전처리 inputs = processor(text=labels, images=image, return_tensors="pt", padding=True) # pixel_values가 4차원인지 확인 후 강제 변환 pixel_values = inputs["pixel_values"] # (batch_size, channels, height, width) # CLIP 모델 예측: forward에 올바른 입력 전달 with torch.no_grad(): outputs = model_clip(pixel_values=pixel_values, input_ids=inputs["input_ids"]) # 확률값 계산 probs = outputs.logits_per_image.softmax(dim=1)[0] return probs.numpy() ###### 텍스트 감정 벡터 예측 함수 ###### sentence_emotions = [] def predict_text_emotion(predict_sentence): if not isinstance(predict_sentence, str): predict_sentence = str(predict_sentence) data = [predict_sentence, '0'] dataset_another = [data] another_test = BERTDataset(dataset_another, 0, 1, tokenizer, vocab, max_len, True, False) test_dataloader = torch.utils.data.DataLoader(another_test, batch_size=1, num_workers=5) model.eval() for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader): token_ids = token_ids.long().to(device) segment_ids = segment_ids.long().to(device) out = model(token_ids, valid_length, segment_ids) for i in out: logits = i.detach().cpu().numpy() emotions = [value.item() for value in i] sentence_emotions.append(emotions) return sentence_emotions[0] # 최종 리스트 반환 ###### 최종 감정 벡터 계산 ###### def generate_final_emotion_vector(diary_input, image_input): # 텍스트 감정 벡터 예측 text_vector = predict_text_emotion(diary_input) # 얼굴 감정 벡터 예측 image_vector = predict_face_emotion(image_input) text_vector = np.array(text_vector, dtype=float) image_vector = np.array(image_vector, dtype=float) print(text_vector) print(image_vector) # 최종 감정 벡터 가중치 적용 return (text_vector * 0.7) + (image_vector * 0.3) ####### 코사인 유사도 함수 ###### def cosine_similarity_fn(vec1, vec2): dot_product = np.dot(vec1, vec2) norm_vec1 = np.linalg.norm(vec1) norm_vec2 = np.linalg.norm(vec2) if norm_vec1 == 0 or norm_vec2 == 0: return np.nan # 제로 벡터인 경우 NaN 반환 return dot_product / (norm_vec1 * norm_vec2) ####### 이미지 다운로드 함수 (PIL 객체 반환) ###### def download_image(image_url): try: response = requests.get(image_url) response.raise_for_status() return Image.open(requests.get(image_url, stream=True).raw) except Exception as e: print(f"이미지 다운로드 오류: {e}") return None # 스타일 옵션 options = { 1: "🌼 친근한", 2: "🔥 트렌디한 MZ세대", 3: "😄 유머러스한 장난꾸러기", 4: "🧘 차분한 명상가", 5: "🎨 창의적인 예술가", } # 일기 분석 함수 def chatbot_diary_with_image(style_option, diary_input, image_input, playlist_input): style = options.get(int(style_option.split('.')[0]), "🌼 친근한") # GPT 응답 (일기 코멘트) try: response_comment = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{"role": "system", "content": f"너는 {style} 챗봇이야."}, {"role": "user", "content": diary_input}], ) comment = response_comment.choices[0].message.content except Exception as e: comment = f"💬 오류: {e}" # GPT 기반 일기 주제 추천 try: topics = get_initial_response(style_option, diary_input) except Exception as e: topics = f"📝 주제 추천 오류: {e}" # DALL·E 3 이미지 생성 요청 (3D 스타일 캐릭터) try: response = openai.Image.create( model="dall-e-3", prompt=( f"{diary_input}를 반영해서 감정을 표현하는 3D 스타일의 일러스트 캐릭터를 그려줘. " "캐릭터는 부드럽고 둥근 디자인에 표정이 감정을 잘 드러내야 해. " "감정을 시각적으로 표현할 수 있는 소품이나 작은 상징을 포함해줘. " "감정의 분위기를 반영하는 선명하고 깨끗한 색상을 사용하고, 캐릭터가 역동적이고 재미있는 자세를 취할 수 있도록 해줘. " "이미지에는 하나의 캐릭터만 나오게 해줘." "배경은 단순하고 밝은 색상으로 설정해서 캐릭터가 강조될 수 있도록 해줘." ), size="1024x1024", n=1 ) # URL 가져오기 및 다운로드 image_url = response['data'][0]['url'] print(f"Generated Image URL: {image_url}") # URL 확인 image = download_image(image_url) except Exception as e: print(f"이미지 생성 오류: {e}") # 오류 상세 출력 image = None # 사용자 최종 감정 벡터 final_user_emotions = generate_final_emotion_vector(diary_input,image_input) # 각 노래에 대한 코사인 유사도 계산 similarities = [cosine_similarity_fn(final_user_emotions, song_vec) for song_vec in emotions] #유효한 유사도 필터링 valid_indices = [i for i, sim in enumerate(similarities) if not np.isnan(sim)] filtered_similarities = [similarities[i] for i in valid_indices] recommendations = np.argsort(filtered_similarities)[::-1] # 높은 유사도 순으로 정렬 results_df = pd.DataFrame({ 'Singer' : melon_emotions['singer'].iloc[recommendations].values, 'title' : melon_emotions['Title'].iloc[recommendations].values, 'genre' : melon_emotions['genre'].iloc[recommendations].values, 'Cosine Similarity': [similarities[idx] for idx in recommendations] }) # 가중치 값 설정 gamma = 0.3 similar_playlists = results_df.head(5) similar_playlists = pd.merge(similar_playlists, melon_emotions, left_on="title", right_on="Title", how="inner") similar_playlists = similar_playlists[["title", "Emotions", "singer"]] dissimilar_playlists = results_df.tail(5) dissimilar_playlists = pd.merge(dissimilar_playlists, melon_emotions, left_on="title", right_on="Title", how="inner") dissimilar_playlists = dissimilar_playlists[["title", "Emotions", "singer"]] #감정과 유사한 플레이리스트 if playlist_input == '비슷한': results = [] seen_songs = set(similar_playlists["title"].values) # 초기 seen_songs에 similar_playlists의 곡들을 추가 # 사용자 감정 벡터 user_emotion_vector = generate_final_emotion_vector(diary_input, image_input).reshape(1, -1) for index, row in similar_playlists.iterrows(): song_title = row["title"] song_singer = row["singer"] song_vector = np.array(row["Emotions"]).reshape(1, -1) song_results = [] for i, emotion_vec in enumerate(emotions): emotion_title = melon_emotions.iloc[i]["Title"] emotion_singer = melon_emotions.iloc[i]["singer"] emotion_vec = np.array(emotion_vec).reshape(1, -1) # similar_playlists에 있는 곡과 seen_songs에 있는 곡은 제외 if ( emotion_title != song_title and emotion_title not in seen_songs ): try: # 곡 간 유사도(Song-Song Similarity) song_song_similarity = cosine_similarity(song_vector, emotion_vec)[0][0] # 사용자 감정 벡터와의 유사도(User-Song Similarity) user_song_similarity = cosine_similarity(user_emotion_vector, emotion_vec)[0][0] # Final Score 계산 final_score = gamma * song_song_similarity + (1 - gamma) * user_song_similarity song_results.append({ "Title": emotion_title, "Singer": emotion_singer, "Song-Song Similarity": song_song_similarity, "User-Song Similarity": user_song_similarity, "Final Score": final_score }) except ValueError as e: print(f"Error with {song_title} vs {emotion_title}: {e}") continue # Final Score를 기준으로 상위 3곡 선택 song_results = sorted(song_results, key=lambda x: x["Final Score"], reverse=True)[:3] seen_songs.update([entry["Title"] for entry in song_results]) results.append({"Song Title": song_title, "Singer": song_singer, "Top 3 Similarities": song_results}) # 결과 출력 for result in results: print(f"{result['Singer']} - {result['Song Title']}") for entry in result["Top 3 Similarities"]: print(f"{entry['Singer']} - {entry['Title']} : Final Score {entry['Final Score']:.4f}") print(f" (Song-Song Similarity: {entry['Song-Song Similarity']:.4f}, User-Song Similarity: {entry['User-Song Similarity']:.4f})") print("-" * 30) #반대 플레이리스트 if playlist_input == '상반된': results = [] seen_songs = set() # 사용자 감정 벡터 user_emotion_vector = generate_final_emotion_vector(diary_input, image_input).reshape(1, -1) for index, row in dissimilar_playlists.iterrows(): song_title = row["title"] song_singer = row["singer"] song_vector = np.array(row["Emotions"]).reshape(1, -1) song_results = [] for i, emotion_vec in enumerate(emotions): emotion_title = melon_emotions.iloc[i]["Title"] emotion_singer = melon_emotions.iloc[i]["singer"] emotion_vec = np.array(emotion_vec).reshape(1, -1) if ( emotion_title != song_title and emotion_title not in dissimilar_playlists["title"].values and emotion_title not in seen_songs ): try: # 곡 간 유사도(Song-Song Similarity) song_song_similarity = cosine_similarity(song_vector, emotion_vec)[0][0] # 사용자 감정 벡터와의 반대 유사도(User-Song Dissimilarity) opposite_user_song_similarity = 1 - cosine_similarity(user_emotion_vector, emotion_vec)[0][0] # Final Score 계산 final_score = gamma * song_song_similarity + (1 - gamma) * opposite_user_song_similarity song_results.append({ "Title": emotion_title, "Singer": emotion_singer, "Song-Song Similarity": song_song_similarity, "User-Song Dissimilarity": opposite_user_song_similarity, "Final Score": final_score }) except ValueError as e: print(f"Error with {song_title} vs {emotion_title}: {e}") continue # Final Score를 기준으로 상위 3곡 선택 (값이 큰 곡이 반대되는 곡) song_results = sorted(song_results, key=lambda x: x["Final Score"], reverse=True)[:3] seen_songs.update(entry["Title"] for entry in song_results) results.append({"Song Title": song_title, "Singer": song_singer, "Top 3 Similarities": song_results}) # 결과 출력 for result in results: print(f"{result['Singer']} - {result['Song Title']}") for entry in result["Top 3 Similarities"]: print(f"{entry['Singer']} - {entry['Title']} : Final Score {entry['Final Score']:.4f}") print(f' (Song-Song Similarity: {entry["Song-Song Similarity"]:.4f}, User-Song Dissimilarity: {entry["User-Song Dissimilarity"]:.4f})') print("-" * 30) # 데이터프레임 변환을 위한 리스트 생성 df_rows = [] for result in results: song_title = result['Song Title'] song_singer = result['Singer'] main_song_info = f"{song_singer} - {song_title}" for entry in result["Top 3 Similarities"]: combined_info = f"{entry['Singer']} - {entry['Title']}" df_rows.append({"1st 추천 플레이리스트": main_song_info, "2nd 추천 플레이리스트": combined_info}) # 데이터프레임 생성 final_music_playlist_recommendation = pd.DataFrame(df_rows) # 곡 제목 그룹화하여 첫 번째 행에만 곡 제목 표시 final_music_playlist_recommendation["1st 추천 플레이리스트"] = final_music_playlist_recommendation.groupby("1st 추천 플레이리스트")["1st 추천 플레이리스트"].transform( lambda x: [x.iloc[0]] + [""] * (len(x) - 1) ) return final_music_playlist_recommendation, comment, topics, image # 일기 주제 추천 함수 def get_initial_response(style, sentence): style = options.get(int(style.split('.')[0]), "🌼 친근한") system_prompt_momentum = ( f"너는 {style}의 챗봇이야. 사용자가 작성한 일기를 바탕으로 생각을 정리하고 내면을 돌아볼 수 있도록 " "도와주는 구체적인 일기 콘텐츠나 질문 4-5개를 추천해줘." ) try: response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[ {"role": "system", "content": system_prompt_momentum}, {"role": "user", "content": sentence} ], temperature=1 ) return response.choices[0].message.content except Exception as e: return f"📝 주제 추천 오류: {e}" # Gradio 인터페이스 with gr.Blocks() as app: gr.Markdown(""" # 📚 EmoDiary : 스마트 감정 일기 도우미 📚 **오늘의 하루를 기록하면, 그에 맞는 플레이리스트와 일기 회고 콘텐츠를 자동으로 생성해드립니다!** ### 사용 방법: 1. 오늘의 하루 기록하기: 하루 동안의 감정이나 일어난 일을 텍스트 박스에 기록하고, 자신의 감정을 반영할 수 있는 얼굴 표정 이미지를 촬영해주세요. 2. AI 스타일 선택하기: 자신의 선호에 맞는 스타일을 선택하면, 해당 스타일에 맞춘 답장과 회고 주제를 추천해드릴게요. 3. 플레이리스트 선택하기: 오늘의 감정에 맞춰 "비슷한" 또는 "상반된" 감정을 선택하면, 그에 맞는 플레이리스트를 추천해드릴게요. 4. 분석 시작: "🚀 분석 시작" 버튼을 클릭하면, 입력한 정보와 이미지를 바탕으로 추천 플레이리스트, 감정 캐릭터 이미지, 그리고 회고 주제가 생성됩니다.""") with gr.Row(): with gr.Column(): diary_input = gr.Textbox(label="📜 오늘 하루 기록하기", placeholder="ex)오늘 소풍가서 맛있는 걸 많이 먹어서 엄청 신났어") chatbot_style = gr.Radio( choices=[f"{k}. {v}" for k, v in options.items()], label="🤖 어떤 스타일의 AI에게 답장과 회고 주제를 추천받고 싶나요?" ) playlist_input = gr.Radio(["비슷한", "상반된"], label="🎧 오늘의 감정과 ㅇㅇ되는 플레이리스트를 추천받고 싶나요?") image_input = gr.Image(type="pil", label="📷 얼굴 표정 사진 업로드", width=256, height=256) submit_btn = gr.Button("🚀 분석 시작") with gr.Column(): output_playlist = gr.Dataframe(label="🎧 추천 플레이리스트 ") output_comment = gr.Textbox(label="💬 AI 코멘트") output_topics = gr.Textbox(label="📝 오늘의 내면을 돌아보는 회고 추천 주제") output_image = gr.Image(label="🖼️ 생성된 오늘의 감정 캐릭터", type="pil", width=256, height=256) # 버튼 클릭 이벤트 연결 submit_btn.click( fn=chatbot_diary_with_image, inputs=[chatbot_style, diary_input, image_input, playlist_input], outputs=[output_playlist, output_comment, output_topics, output_image] ) # 앱 실행 app.launch(debug=True)