| """ |
| This class is a LLM based recommender that can choose the perfect content for the user given user profile and our goal |
| |
| """ |
| import json |
| import os |
| import random |
|
|
| import pandas as pd |
| import openai |
| from openai import OpenAI |
| from dotenv import load_dotenv |
| import time |
| import streamlit as st |
| from tqdm import tqdm |
| from Messaging_system.Homepage_Recommender import DefaultRec |
| load_dotenv() |
|
|
|
|
| |
| class LLMR: |
|
|
| def __init__(self, CoreConfig, random=False): |
|
|
| self.Core = CoreConfig |
| self.user = None |
| self.selected_content_ids = [] |
| self.random=random |
|
|
| def get_recommendations(self, progress_callback): |
| """ |
| selecting the recommended content for each user |
| :return: |
| """ |
| default = DefaultRec(self.Core) |
|
|
| self.Core.users_df["recommendation"] = None |
| self.Core.users_df["recommendation_info"] = None |
| total_users = len(self.Core.users_df) |
|
|
| st.write("Choosing the best content to recommend ... ") |
|
|
| self.Core.start_time = time.time() |
| for progress, (idx, row) in enumerate( |
| tqdm(self.Core.users_df.iterrows(), desc="Selecting the best content to recommend ...")): |
| |
| |
| if progress_callback is not None: |
| progress_callback(progress, total_users) |
|
|
| self.user = row |
| recommendation_dict, content_info, recsys_json, token = self._get_recommendation() |
|
|
| if recommendation_dict["content_id"] is None: |
| self.Core.users_df.at[idx, "recommendation"] = default.recommendation |
| self.Core.users_df.at[idx, "recommendation_info"] = default.recommendation_info |
| self.Core.users_df.at[idx, "recsys_result"] = default.for_you_url |
|
|
|
|
| else: |
| |
| self.Core.total_tokens['prompt_tokens'] += int(token['prompt_tokens']) |
| self.Core.total_tokens['completion_tokens'] += int(token['completion_tokens']) |
| self.Core.temp_token_counter = int(token['prompt_tokens']) + int(token['completion_tokens']) |
| self.Core.users_df.at[idx, "recommendation"] = recommendation_dict |
| self.Core.users_df.at[idx, "recommendation_info"] = content_info |
| self.Core.users_df.at[idx, "recsys_result"] = recsys_json |
| self.Core.respect_request_ratio() |
|
|
| return self.Core |
|
|
| |
| |
| def _get_recommendation(self): |
| """ |
| select and return the recommendation from the available list of contents |
| :return: content_id |
| """ |
|
|
| if self.random: |
| return self._get_recommendation_random() |
|
|
| prompt, recsys_json = self._generate_prompt() |
| if prompt is None: |
| return None, None, None, None |
|
|
| else: |
| content_id, tokens = self.get_llm_response(prompt) |
| if content_id == 0: |
| |
| return None, None, None, None |
| else: |
| content_info = self._get_content_info(content_id) |
| recsys_data = json.loads(recsys_json) |
| recommendation_dict = self._get_recommendation_info(content_id, recsys_data) |
| return recommendation_dict, content_info, recsys_json, tokens |
|
|
| |
| |
|
|
| def _generate_prompt(self): |
| """ |
| Generates the prompts for given user in order to choose the recommendation from the available list |
| :param user: |
| :return: |
| """ |
| available_contents, recsys_json = self._get_available_contents() |
| if available_contents.strip() == "": |
| return None |
|
|
| |
| input_context = self._input_context() |
| user_info = self._get_user_profile() |
| task = self._task_instructions() |
| output_instruction = self._output_instruction() |
|
|
| prompt = f""" |
| ### Context: |
| {input_context} |
| |
| ### User Information: |
| {user_info} |
| |
| ### Available Contents: |
| {available_contents} |
| |
| ### Main Task: |
| {task} |
| |
| ### Output Instructions: |
| {output_instruction} |
| """ |
|
|
| return prompt, recsys_json |
|
|
| |
| |
| def _input_context(self): |
| """ |
| :return: input instructions as a string |
| """ |
|
|
| context = f""" |
| You are a helpful educational music content recommender. Your goal is to choose a perfect content to recommend to the user given the information that we have from the user and available contents to recommend. |
| """ |
|
|
| return context |
|
|
| |
| |
| def _system_instructions(self): |
| """ |
| (Optional) A helper function that defines high-level system context for certain LLMs. |
| For example, if your LLM endpoint supports messages in the form of role='system'. |
| """ |
| context = f""" |
| You are a helpful educational music content recommender |
| """ |
|
|
| return context |
|
|
| |
| |
| def _task_instructions(self): |
| """ |
| creating the instructions about the task |
| :return: task |
| """ |
|
|
| task = """ |
| - You must select exactly ONE content from the 'Available Contents' to recommend. |
| - Base your decision on the User information and focus on providing the most relevant recommendation. |
| - Do not recommended content where the topic is focused on a specific Gear (e.g. YAMAHA) |
| - Provide the content_id of the recommended content in the output based on Output instructions. |
| """ |
|
|
| return task |
|
|
| |
| |
| def _get_user_profile(self): |
| """ |
| getting user's goal and user's last completed content to use for choosing the recommended content |
| :return: |
| """ |
|
|
| last_completed_content = self._get_user_data(attribute="last_completed_content") |
| user_info = self._get_user_data(attribute="user_info") |
|
|
| recommendation_info = f""" |
| **User information and preferences:** |
| |
| {user_info} |
| |
| **Previous completed content:** |
| {last_completed_content} |
| """ |
|
|
| return recommendation_info |
|
|
| |
| |
| def _get_user_data(self, attribute): |
| """ |
| get user's information for the requested attribute |
| :param user: |
| :return: user_info |
| """ |
|
|
| |
| if pd.notna(self.user[attribute]) and self.user[attribute] not in [ |
| None, [], {}] and (not isinstance(self.user[attribute], str) or self.user[attribute].strip()): |
| user_info = self.user[attribute] |
| else: |
| user_info = "Not Available" |
|
|
| return user_info |
|
|
| |
| |
|
|
| def _get_user_recommendation(self): |
|
|
| recsys_json = self.user["recsys_result"] |
|
|
| try: |
| recsys_data = json.loads(recsys_json) |
| |
| sections = self.Core.recsys_contents |
|
|
| |
| if not any(section in recsys_data for section in sections): |
| popular_content = self.Core.popular_contents_df.iloc[0][f"popular_content"] |
| return popular_content |
| else: |
| return recsys_json |
| except: |
| popular_content = self.Core.popular_contents_df.iloc[0][f"popular_content"] |
| return popular_content |
|
|
|
|
| |
| |
| def _get_available_contents(self): |
|
|
| |
| recsys_json = self._get_user_recommendation() |
| recsys_data = json.loads(recsys_json) |
|
|
| |
| sections = self.Core.recsys_contents |
|
|
| |
| selected_content_ids = [] |
|
|
| for section in sections: |
| if section in recsys_data: |
| |
| recs = recsys_data[section] |
| |
| recs_sorted = sorted(recs, key=lambda x: x['recommendation_rank']) |
| |
| top_recs = recs_sorted[:3] |
| |
| content_ids = [rec['content_id'] for rec in top_recs] |
| |
| selected_content_ids.extend(content_ids) |
| |
| content_info_rows = self.Core.content_info[self.Core.content_info['content_id'].isin(selected_content_ids)] |
|
|
| |
| content_info_map = dict(zip(content_info_rows['content_id'], content_info_rows['content_info'])) |
|
|
| |
| lines = [] |
| for content_id in selected_content_ids: |
| |
| content_info = content_info_map.get(content_id, "No content info found") |
|
|
| |
| lines.append(f"**content_id**: {content_id}") |
| lines.append("**content_info**:") |
| lines.append(content_info) |
| lines.append("") |
|
|
| |
| text = "\n".join(lines) |
|
|
| self.selected_content_ids = selected_content_ids |
|
|
| return text, recsys_json |
|
|
| |
| |
|
|
| def _get_content_info(self, content_id): |
| """ |
| getting content_info for the recommended content |
| :param content_id: |
| :return: |
| """ |
|
|
| content_info_row = self.Core.content_info[self.Core.content_info['content_id'] == content_id] |
| content_info = content_info_row['content_info'].iloc[0] |
|
|
| return content_info |
|
|
| |
| |
| def is_valid_content_id(self, content_id): |
| """ |
| check if the llm respond is a valid content_id |
| :param content_id: |
| :return: |
| """ |
|
|
| if content_id in self.selected_content_ids: |
| return True |
| else: |
| return False |
|
|
| |
| |
| def _output_instruction(self): |
| """ |
| :return: output instructions as a string |
| """ |
|
|
| instructions = f""" |
| Return the content_id of the final recommendation in **JSON** format with the following structure: |
| |
| {{ |
| "content_id": "content_id of the recommended content from Available Contents, as an integer", |
| }} |
| |
| Do not include any additional keys or text outside the JSON. |
| """ |
|
|
| return instructions |
|
|
| def get_llm_response(self, prompt, max_retries=4): |
| """ |
| sending the prompt to the LLM and get back the response |
| """ |
|
|
| openai.api_key = self.Core.api_key |
| instructions = self._system_instructions() |
| client = OpenAI(api_key=self.Core.api_key) |
|
|
| for attempt in range(max_retries): |
| try: |
| response = client.chat.completions.create( |
| model=self.Core.model, |
| response_format={"type": "json_object"}, |
| messages=[ |
| {"role": "system", "content": instructions}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=20, |
| n=1, |
| temperature=0.7 |
| ) |
|
|
| tokens = { |
| 'prompt_tokens': response.usage.prompt_tokens, |
| 'completion_tokens': response.usage.completion_tokens, |
| 'total_tokens': response.usage.total_tokens |
| } |
|
|
| try: |
| content = response.choices[0].message.content |
|
|
| |
|
|
| output = json.loads(content) |
|
|
| if 'content_id' in output and self.is_valid_content_id(int(output['content_id'])): |
| return int(output['content_id']), tokens |
|
|
| else: |
| print(f"'content_id' missing or invalid in response on attempt {attempt + 1}. Retrying...") |
| continue |
|
|
| except json.JSONDecodeError: |
| print(f"Invalid JSON from LLM on attempt {attempt + 1}. Retrying...") |
|
|
| except openai.APIConnectionError as e: |
| print("The server could not be reached") |
| print(e.__cause__) |
| except openai.RateLimitError as e: |
| print("A 429 status code was received; we should back off a bit.") |
| except openai.APIStatusError as e: |
| print("Another non-200-range status code was received") |
| print(e.status_code) |
| print(e.response) |
|
|
| print("Max retries exceeded. Returning empty response.") |
| return 0, 0 |
|
|
|
|
| |
| |
| |
| def _get_recommendation_random(self): |
| """ |
| Randomly pick ONE valid item from the top-5 of each requested section. |
| If the first random pick is missing/invalid, keep trying other candidates. |
| Also remove the picked item from every section in recsys_json. |
| Returns: (recommendation_dict, content_info, updated_recsys_json, zero_tokens_dict) |
| """ |
| import json, random |
|
|
| |
| recsys_json = self._get_user_recommendation() |
| try: |
| recsys_data = json.loads(recsys_json) if recsys_json else {} |
| except Exception: |
| recsys_data = {} |
|
|
| sections = self.Core.recsys_contents |
|
|
| |
| unique_candidates = self.build_unique_candidates(recsys_data, sections) |
|
|
| |
| used_popular_fallback = False |
| if not unique_candidates: |
| recsys_data = self._get_popular_fallback_json(k=5) |
| unique_candidates = self.build_unique_candidates(recsys_data, sections) |
| used_popular_fallback = True |
|
|
| |
| if not unique_candidates: |
| return None, None, None, None |
|
|
| |
| idxs = list(range(len(unique_candidates))) |
| random.shuffle(idxs) |
|
|
| picked_id, recommendation_dict, content_info = self.try_pick_from_candidates(idxs, unique_candidates, |
| recsys_data) |
|
|
| |
| if picked_id is None and not used_popular_fallback: |
| recsys_data = self._get_popular_fallback_json(k=5) |
| unique_candidates = self.build_unique_candidates(recsys_data, sections) |
| if unique_candidates: |
| idxs = list(range(len(unique_candidates))) |
| random.shuffle(idxs) |
| picked_id, recommendation_dict, content_info = self.try_pick_from_candidates(idxs, unique_candidates, |
| recsys_data) |
|
|
| |
| if picked_id is None: |
| return None, None, None, None |
|
|
| |
| recsys_data = self._remove_selected_from_all(recsys_data, picked_id) |
|
|
| |
| self.selected_content_ids = [r["content_id"] for r in unique_candidates if r.get("content_id")] |
|
|
| |
| updated_json = json.dumps(recsys_data) |
| zero_tokens = {"prompt_tokens": 0, "completion_tokens": 0} |
|
|
| return recommendation_dict, content_info, updated_json, zero_tokens |
|
|
| |
| def build_unique_candidates(self, src_data, sections): |
| |
| cands = self._collect_top_k(src_data, sections, k=5) |
| seen, uniq = set(), [] |
| for rec in cands or []: |
| cid = rec.get("content_id") |
| if cid and cid not in seen: |
| seen.add(cid) |
| uniq.append(rec) |
| return uniq |
|
|
| |
| def try_pick_from_candidates(self, idxs, candidates, source_data): |
| """ |
| Iterate candidates in random order, returning the first valid pick: |
| (picked_id, recommendation_dict, content_info) or (None, None, None) |
| """ |
| banned_contents = set(self.Core.config_file.get("banned_contents", [])) |
|
|
| for i in idxs: |
| rec = candidates[i] |
| picked_id = rec.get("content_id") |
| if not picked_id: |
| continue |
| |
| if picked_id in banned_contents: |
| continue |
| try: |
| |
| content_info = self._get_content_info(picked_id) |
| if not content_info: |
| |
| continue |
|
|
| recommendation_dict = self._get_recommendation_info(picked_id, source_data) |
| |
| return picked_id, recommendation_dict, content_info |
|
|
| except IndexError: |
| |
| continue |
| except KeyError: |
| continue |
| except Exception: |
| |
| continue |
| return None, None, None |
| |
| |
| |
| def _get_recommendation_info(self, content_id, recsys_data): |
| |
| found_item=None |
| for category, items in recsys_data.items(): |
| for item in items: |
| if item.get("content_id") == content_id: |
| found_item = item |
| break |
| if found_item: |
| break |
|
|
| if found_item is None: |
| print(f"content_id {content_id} not found in recsys_data") |
| return None |
|
|
| |
| web_url_path = found_item.get("web_url_path") |
| title = found_item.get("title") |
| thumbnail_url = found_item.get("thumbnail_url") |
|
|
| |
| recommendation_dict = { |
| "content_id": content_id, |
| "web_url_path": web_url_path, |
| "title": title, |
| "thumbnail_url": thumbnail_url |
| } |
|
|
| return recommendation_dict |
| |
| def _collect_top_k(self, recsys_data: dict, sections, k: int = 5): |
| """ |
| From each section, grab top-k items by recommendation_rank (ascending). |
| Returns a flat list of rec dicts. |
| """ |
| out = [] |
| for sec in sections: |
| recs = recsys_data.get(sec, []) |
| if not isinstance(recs, list): |
| continue |
| recs_sorted = sorted( |
| recs, |
| key=lambda x: x.get("recommendation_rank", float("inf")) |
| ) |
| out.extend(recs_sorted[:k]) |
| return out |
| |
| def _get_popular_fallback_json(self, k: int = 5): |
| """ |
| Build a recsys-like dict from popular contents when user has no recsys_result. |
| Assumes self.Core.popular_contents_df.iloc[0]['popular_content'] holds a JSON string |
| with the same structure: {section: [{content_id, recommendation_rank, ...}, ...], ...} |
| """ |
| try: |
| popular_json = self.Core.popular_contents_df.iloc[0]["popular_content"] |
| data = json.loads(popular_json) |
| except Exception: |
| return {} |
|
|
| sections = self.Core.recsys_contents |
| out = {} |
| for sec in sections: |
| recs = data.get(sec, []) |
| recs_sorted = sorted( |
| recs, |
| key=lambda x: x.get("recommendation_rank", float("inf")) |
| ) |
| out[sec] = recs_sorted[:k] |
| return out |
| |
| def _remove_selected_from_all(self, recsys_data: dict, content_id): |
| """ |
| Remove the chosen content_id from every section so it won't be recommended again. |
| """ |
| for sec, recs in list(recsys_data.items()): |
| if isinstance(recs, list): |
| recsys_data[sec] = [r for r in recs if r.get("content_id") != content_id] |
| return recsys_data |
|
|
|
|