| from unsloth import FastLanguageModel |
| from peft import PeftModel |
| import pandas as pd |
| from unsloth.chat_templates import get_chat_template |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
| from sentence_transformers import SentenceTransformer, util |
| import nltk |
| import json |
| from google.oauth2.service_account import Credentials |
| import gspread |
| import gradio as gr |
|
|
| |
| nltk.download("stopwords") |
| from nltk.corpus import stopwords |
|
|
| |
| model, tokenizer = FastLanguageModel.from_pretrained( |
| model_name="unsloth/Llama-3.2-3B-Instruct", |
| max_seq_length=2048, |
| dtype=None, |
| load_in_4bit=True |
| ) |
| adapter_path = "FosterSystemsDatabase/model" |
| model = PeftModel.from_pretrained(model, adapter_path) |
|
|
| |
| file_path = 'Clean Missouri Data.csv' |
| df = pd.read_csv(file_path, encoding='MacRoman') |
|
|
| def search_relevant_policies(query, df, top_n=10, max_chars=40000): |
| tfidf = TfidfVectorizer(stop_words='english') |
| tfidf_matrix = tfidf.fit_transform(df['Content']) |
| query_vector = tfidf.transform([query]) |
| cosine_sim = cosine_similarity(query_vector, tfidf_matrix).flatten() |
|
|
| top_indices = cosine_sim.argsort()[-top_n:][::-1] |
| relevant_policies = df.iloc[top_indices].copy() |
|
|
| char_count = 0 |
| valid_indices = [] |
| for idx, row in relevant_policies.iterrows(): |
| content_length = len(row["Content"]) |
| if char_count + content_length > max_chars: |
| break |
| char_count += content_length |
| valid_indices.append(idx) |
|
|
| truncated_policies = relevant_policies.loc[valid_indices] |
| return truncated_policies |
|
|
| def get_content_after_query(response_text, query): |
| query_position = response_text.lower().find(query.lower()) |
| if query_position != -1: |
| res = response_text[query_position + len(query):].strip() |
| return res[11:] |
| else: |
| return response_text.strip() |
|
|
| def process_query(query, tokenizer): |
| relevant_policies = search_relevant_policies(query, df) |
| formatted_policies = [row['Content'] for _, row in relevant_policies.iterrows()] |
| relevant_policy_text = "\n\n".join(formatted_policies) |
|
|
| messages_with_relevant_policies = [ |
| {"role": "system", "content": relevant_policy_text}, |
| {"role": "user", "content": query}, |
| ] |
|
|
| tokenizer = get_chat_template(tokenizer, chat_template="llama-3.1") |
| inputs = tokenizer.apply_chat_template( |
| messages_with_relevant_policies, |
| tokenize=True, |
| add_generation_prompt=True, |
| return_tensors="pt" |
| ).to("cuda") |
|
|
| FastLanguageModel.for_inference(model) |
| outputs = model.generate(input_ids=inputs, max_new_tokens=512, use_cache=True, temperature=0.7, min_p=0.1) |
| generated_response = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0] |
| response = get_content_after_query(generated_response, query) |
|
|
| model_sbert = SentenceTransformer('all-MiniLM-L6-v2') |
| response_embedding = model_sbert.encode(generated_response, convert_to_tensor=True) |
| policy_embeddings = model_sbert.encode(relevant_policies['Content'].tolist(), convert_to_tensor=True) |
| cosine_similarities = util.cos_sim(response_embedding, policy_embeddings).flatten() |
| most_relevant_index = cosine_similarities.argmax().item() |
| most_relevant_link = relevant_policies.iloc[most_relevant_index]['Link to Content'] |
|
|
| return { |
| "response": response, |
| "most_relevant_link": most_relevant_link |
| } |
|
|
| |
| json_file_path = "fostercare-449201-85282f81c3b7.json" |
| with open(json_file_path, 'r') as file: |
| service_account_data = json.load(file) |
| scopes = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] |
| creds = Credentials.from_service_account_info(service_account_data, scopes=scopes) |
| client = gspread.authorize(creds) |
| spreadsheet = client.open("Fostercare Responses").sheet1 |
|
|
| |
| def greet(query): |
| result_1 = process_query(query, tokenizer) |
| result_2 = process_query(query, tokenizer) |
| return [result_1["response"], result_2["response"]] |
|
|
| def choose_preference(name, output1, output2, preference, query, broken): |
| if not name: |
| return "Please enter your name before submitting." |
|
|
| broken_flag = "Yes" if broken else "No" |
|
|
| if preference == "Output 1": |
| new_row = [query, output1, output2, name, broken_flag] |
| spreadsheet.append_row(new_row) |
| return f"You preferred: Output 1 - {output1}" |
| elif preference == "Output 2": |
| new_row = [query, output2, output1, name, broken_flag] |
| spreadsheet.append_row(new_row) |
| return f"You preferred: Output 2 - {output2}" |
| else: |
| return "No preference selected." |
|
|
| |
| with gr.Blocks() as demo: |
| name_input = gr.Textbox(label="Enter your name") |
| query_input = gr.Textbox(label="Enter your query") |
| generate_button = gr.Button("Generate Outputs") |
| output_1 = gr.Textbox(label="Output 1", interactive=False) |
| output_2 = gr.Textbox(label="Output 2", interactive=False) |
| preference = gr.Radio(["Output 1", "Output 2"], label="Choose your preferred output") |
| broken_flag = gr.Checkbox(label="Mark as Broken Response") |
| preference_result = gr.Textbox(label="Preference Result", interactive=False) |
| submit_button = gr.Button("Submit Preference") |
|
|
| generate_button.click(greet, inputs=query_input, outputs=[output_1, output_2]) |
| submit_button.click( |
| choose_preference, |
| inputs=[name_input, output_1, output_2, preference, query_input, broken_flag], |
| outputs=preference_result |
| ).then( |
| fn=lambda: ("", "", "", "", "", False, ""), |
| inputs=[], |
| outputs=[name_input, query_input, output_1, output_2, preference, broken_flag, preference_result] |
| ) |
|
|
| demo.launch() |