|
import gradio as gr |
|
from mistralai.client import MistralClient |
|
from mistralai.models.chat_completion import ChatMessage |
|
import os |
|
import pandas as pd |
|
import numpy as np |
|
from groq import Groq |
|
import anthropic |
|
from users_management import update_json, users |
|
from code_df_custom import load_excel |
|
import zipfile |
|
|
|
|
|
|
|
def ask_llm(query, user_input, client_index, user): |
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": f"You are a helpful assistant. Only show your final response to the **User Query**! Do not provide any explanations or details: \n# User Query:\n{query}." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": user_input, |
|
} |
|
] |
|
|
|
systemC = messages[0]["content"] |
|
messageC = [{ |
|
"role": "user", |
|
"content": [{ |
|
"type": "text", |
|
"text": user_input |
|
}] |
|
}] |
|
|
|
if "Mistral" in client_index: |
|
client = MistralClient(api_key=os.environ[user['api_keys']['mistral']]) |
|
model_map = { |
|
"Mistral Tiny": "mistral-tiny", |
|
"Mistral Small": "mistral-small-latest", |
|
"Mistral Medium": "mistral-medium", |
|
} |
|
chat_completion = client.chat(messages=messages, model=model_map[client_index]) |
|
|
|
elif "Claude" in client_index: |
|
client = anthropic.Anthropic(api_key=os.environ[user['api_keys']['claude']]) |
|
model_map = { |
|
"Claude Sonnet": "claude-3-sonnet-20240229", |
|
"Claude Opus": "claude-3-opus-20240229", |
|
} |
|
response = client.messages.create( |
|
model=model_map[client_index], |
|
max_tokens=350, |
|
temperature=0, |
|
system=systemC, |
|
messages=messageC |
|
) |
|
return response.content[0].text |
|
|
|
elif client_index == "Groq (mixtral)": |
|
client = Groq(api_key=os.environ[user['api_keys']['groq']]) |
|
chat_completion = client.chat.completions.create( |
|
messages=messages, |
|
model='mixtral-8x7b-32768', |
|
) |
|
|
|
else: |
|
raise ValueError("Unsupported client index provided") |
|
|
|
|
|
return chat_completion.choices[0].message.content if client_index != "Claude" else chat_completion |
|
|
|
|
|
|
|
|
|
def filter_df(df, column_name, keywords): |
|
if len(keywords)>0: |
|
if column_name in df.columns: |
|
contains_keyword = lambda x: any(keyword.lower() in (x.lower() if type(x)==str else '') for keyword in keywords) |
|
filtered_df = df[df[column_name].apply(contains_keyword)] |
|
else: |
|
contains_keyword = lambda row: any(keyword.lower() in (str(cell).lower() if isinstance(cell, str) else '') for keyword in keywords for cell in row) |
|
filtered_df = df[df.apply(contains_keyword, axis=1)] |
|
else: |
|
filtered_df = df |
|
return filtered_df |
|
|
|
def chat_with_mistral(source_cols, dest_col, prompt, excel_file, url, search_col, keywords, client, user): |
|
new_prompts, new_keywords, new_user, conf_file_path = update_json(user, prompt, keywords) |
|
print(f'xlsxfile = {excel_file}') |
|
df = pd.read_excel(excel_file) |
|
df[dest_col] = "" |
|
if excel_file: |
|
file_name = excel_file.split('.xlsx')[0] + "_with_" + dest_col.replace(' ', '_') + ".xlsx" |
|
elif url.endswith('Docs/', 'Docs'): |
|
file_name = url.split("/Docs")[0].split("/")[-1] + ".xlsx" |
|
else: |
|
file_name = "meeting_recap_grid.xlsx" |
|
|
|
print(f"Keywords: {keywords}") |
|
|
|
filtred_df = filter_df(df, search_col, keywords) |
|
|
|
cpt = 1 |
|
for index, row in filtred_df.iterrows(): |
|
concatenated_content = "\n\n".join(f"{column_name}: {str(row[column_name])}" for column_name in source_cols) |
|
if not concatenated_content == "\n\n".join(f"{column_name}: nan" for column_name in source_cols): |
|
llm_answer = ask_llm(prompt[0], concatenated_content, client, user) |
|
print(f"{cpt}/{len(filtred_df)}\nQUERY:\n{prompt[0]}\nCONTENT:\n{concatenated_content[:200]}...\n\nANSWER:\n{llm_answer}") |
|
df.at[index, dest_col] = llm_answer |
|
cpt += 1 |
|
|
|
|
|
df.to_excel(file_name, index=False) |
|
|
|
zip_file_path = 'config_file.zip' |
|
|
|
with zipfile.ZipFile(zip_file_path, 'w') as zipf: |
|
zipf.write(conf_file_path, os.path.basename(conf_file_path)) |
|
|
|
return file_name, df.head(5), new_prompts, new_keywords, new_user, zip_file_path |
|
|
|
|
|
def get_columns(file,progress=gr.Progress()): |
|
if file is not None: |
|
|
|
filename, df = load_excel(file) |
|
columns = list(df.columns) |
|
return gr.update(choices=columns), gr.update(choices=columns), gr.update(choices=columns), gr.update(choices=columns + [""]), gr.update(choices=columns + ['[ALL]']), df.head(5), filename, df |
|
else: |
|
return gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]), pd.DataFrame(), '', pd.DataFrame() |