Spaces:
Sleeping
Sleeping
import re | |
import random | |
from app_config import SYSTEM_PROMPT, SLOT_ID_PATTERN, INVOICE_NUM_PATTERN,ROOT_DIR | |
from utils.schemas import ChatBotInput, SlotScheduleInput, SlotUpdateInput | |
from PriceEstimation.classify import predict_class | |
from PriceEstimation.price_predict import waste_tyre_price | |
from langchain.agents import tool | |
from langchain_core.messages import SystemMessage, HumanMessage | |
from dotenv import load_dotenv | |
from pathlib import Path | |
import pandas as pd | |
from datetime import datetime, timedelta | |
import os | |
import json | |
env_path = Path('.') / '.env' | |
load_dotenv(dotenv_path=env_path) | |
import session_manager | |
session_state = session_manager.get_session_state() | |
def response_generator(prompt: str) -> str: | |
"""this function can be used for general quetion answers which are related to tyrex and tyre recycling | |
Args: | |
prompt (string): user query | |
Returns: | |
string: answer of the query | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "question-answer-tool" | |
prompt = session_state.last_query | |
print("my_Prompt is", prompt) | |
print() | |
print() | |
retriever = session_state.retriever | |
docs = retriever.invoke(prompt) | |
my_context = [doc.page_content for doc in docs] | |
my_context = '\n\n'.join(my_context) | |
print("***************************** CONTEXT *****************************") | |
print(my_context) | |
system_message = SystemMessage(content=SYSTEM_PROMPT.format(context=my_context, previous_message_summary=session_state.rag_memory.moving_summary_buffer)) | |
chat_messages = (system_message + session_state.rag_memory.chat_memory.messages + HumanMessage(content=prompt)).messages | |
print("messages", chat_messages) | |
print() | |
print() | |
response = session_state.llm.invoke(chat_messages) | |
print(response) | |
return response.content | |
except Exception as error: | |
print(error) | |
return "Oops! something went wrong, please try again." | |
def schedule_slot(address, tyre_counts) -> str: | |
"""this function can be used for scheduling or booking slot | |
Args: | |
address (string): Address to pickup the tyres | |
tyre_counts (int): Number of the tyres to pickup | |
Returns: | |
string: final responce which user needs | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "slot-scheduling-tool" | |
if "slot_booking_agent" in session_state.agent_history: | |
session_state.agent_history['slot_booking_agent'].append({"role": "user", "content": session_state.last_query}) | |
else: | |
session_state.agent_history['slot_booking_agent'] = [{"role": "user", "content": session_state.last_query}] | |
session_state.next_agent = "slot_booking_agent" | |
try: | |
tyre_counts = int(tyre_counts) | |
except Exception as error: | |
tyre_counts = None | |
print(error) | |
words = ['provide', 'give me ', 'address', 'need', 'want', 'needs', 'wants'] | |
is_valid_address = True | |
for word in words: | |
if word in address.lower(): | |
is_valid_address = False | |
break | |
if (not address or not is_valid_address) and not tyre_counts: | |
response = "We will need your address and number of tyres to pick up to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
return response | |
elif not address or not is_valid_address: | |
response = "We will need your address to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
return response | |
elif not tyre_counts: | |
response = "We will need number of tyres to pick up to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
return response | |
day = random.randint(1, 10) | |
pickup_time = datetime.now() + timedelta(days=day) | |
df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
df.loc[len(df)] = [f"SLOT-{len(df)}", address, tyre_counts, pickup_time.strftime('%d-%m-%Y'), "Booked"] | |
df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
response = f"Your slot has been booked with slot id ***{df.iloc[-1]['slot_id']}*** our pickup agent will come at ***{pickup_time.strftime('%d-%m-%Y')}*** at your address ***{address}*** to pickup ***{tyre_counts}*** tyres." | |
session_state.agent_history['slot_booking_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return response | |
except Exception as error: | |
print(error) | |
return error | |
def cancle_slot(inp = None) -> str: | |
"""this function can be used cancle booked slot. | |
Returns: | |
string: final responce after slot canclelation. | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "slot-canclelation-tool" | |
if "slot_canclelation_agent" in session_state.agent_history: | |
session_state.agent_history['slot_canclelation_agent'].append({"role": "user", "content": session_state.last_query}) | |
else: | |
session_state.agent_history['slot_canclelation_agent'] = [{"role": "user", "content": session_state.last_query}] | |
session_state.next_agent = "slot_canclelation_agent" | |
slot_id = None | |
for message in reversed(session_state.agent_history['slot_canclelation_agent']): | |
if message['role'] == 'user': | |
match = re.search(SLOT_ID_PATTERN, message['content']) | |
if match: | |
slot_id = match.group(0) | |
break | |
if slot_id is None: | |
response = "We will need a valid Slot id to Cancle a slot." | |
session_state.agent_history['slot_canclelation_agent'].append({"role": "assistant", "content": response}) | |
return response | |
df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
if len(df[df['slot_id'] == slot_id]) > 0: | |
df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] = 'Cancled' | |
df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
response = f"Your slot with slot id ***{df.iloc[-1]['slot_id']}*** has been cancleled successfully." | |
session_state.agent_history['slot_canclelation_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return response | |
else: | |
response = f"We couldn't find any slot with slot id ***{slot_id}***. Please enter valid slot id." | |
session_state.agent_history['slot_canclelation_agent'].append({"role": "assistant", "content": response}) | |
return response | |
except Exception as error: | |
print(error) | |
return error | |
def update_slot(slot_id,address,tyre_counts) -> str: | |
"""this function can be used update booked slot details like address or tyre count. | |
Args: | |
slot_id (string): Slot id to cancle a slot. Slot id must be in formate 'SLOT-*', | |
address (string): Address to pickup the tyres | |
tyre_counts (int): Number of the tyres to pickup | |
Returns: | |
string: final responce after slot updation. | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "slot-update-tool" | |
if "slot_update_agent" in session_state.agent_history: | |
session_state.agent_history['slot_update_agent'].append({"role": "user", "content": session_state.last_query}) | |
else: | |
session_state.agent_history['slot_update_agent'] = [{"role": "user", "content": session_state.last_query}] | |
session_state.next_agent = "slot_update_agent" | |
slot_id = None | |
for message in reversed(session_state.agent_history['slot_update_agent']): | |
if message['role'] == 'user': | |
match = re.search(SLOT_ID_PATTERN, message['content']) | |
if match: | |
slot_id = match.group(0) | |
break | |
df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
if slot_id is None or len(df[df['slot_id']==slot_id])==0: | |
response = "We will need a valid Slot id to update a slot." | |
session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
return response | |
day = random.randint(1, 10) | |
pickup_time = datetime.now() + timedelta(days=day) | |
if df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] != 'Booked': | |
response = "You can not update this slot because this slot is already " + str(df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"])+ "." | |
session_state.agent_history['slot_update_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return response | |
try: | |
tyre_counts = int(tyre_counts) | |
except Exception as error: | |
tyre_counts = None | |
print(error) | |
words = ['provide', 'give me ', 'address', 'need', 'want', 'needs', 'wants'] | |
is_valid_address = True | |
for word in words: | |
if word in address.lower(): | |
is_valid_address = False | |
break | |
if (not address or not is_valid_address) and not tyre_counts: | |
response = "We will need your address or number of tyres to pick up for update a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
return response | |
if len(df[df['slot_id'] == slot_id]) > 0 : | |
if address and is_valid_address: | |
# update address | |
df.loc[df[df['slot_id']==slot_id]['address'].index[0], "address"] = address | |
if tyre_counts: | |
# update tyre count | |
df.loc[df[df['slot_id']==slot_id]['num_of_tyres'].index[0], "num_of_tyres"] = tyre_counts | |
df.loc[df[df['slot_id']==slot_id]['pickup_date'].index[0], "pickup_date"] = pickup_time.strftime('%d-%m-%Y') | |
df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] = 'Booked' | |
df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
response = f"We have updated your slot with ***{slot_id}***. Now your address is ***{df[df['slot_id']==slot_id]['address'].iloc[0]}***. Number of tyres is ***{df[df['slot_id']==slot_id]['num_of_tyres'].iloc[0]}*** and new pickup time is ***{pickup_time.strftime('%d-%m-%Y')}***" | |
session_state.agent_history['slot_update_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return response | |
else: | |
response = f"We couldn't find any slot with slot id ***{slot_id}***. Please enter valid slot id." | |
session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
return response | |
except Exception as error: | |
print(error) | |
return error | |
def get_slot_details(inp = None) -> str: | |
"""this function can be used to get details of slots. | |
Returns: | |
pandas.core.frame.DataFrame: Dataframe which contains user slot details. | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "slot-fetching-tool" | |
df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
num_of_tyres = df[df['status']=='Booked']['num_of_tyres'].sum() | |
return {"df": df, "num_of_tyres": num_of_tyres} | |
except Exception as error: | |
print(error) | |
return error | |
def get_invoice(inp = None) -> str: | |
"""this function can be used to get Invoice. | |
Returns: | |
string: final invoice. | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "get-invoice-tool" | |
if "get_invoice_agent" in session_state.agent_history: | |
session_state.agent_history['get_invoice_agent'].append({"role": "user", "content": session_state.last_query}) | |
else: | |
session_state.agent_history['get_invoice_agent'] = [{"role": "user", "content": session_state.last_query}] | |
session_state.next_agent = "get_invoice_agent" | |
invoice_num = None | |
print(session_state.agent_history['get_invoice_agent']) | |
for message in reversed(session_state.agent_history['get_invoice_agent']): | |
if message['role'] == 'user': | |
print() | |
print(message['content']) | |
match = re.search(INVOICE_NUM_PATTERN, message['content']) | |
print(match) | |
if match: | |
invoice_num = match.group(0) | |
break | |
if invoice_num is None: | |
response = "We will need a valid Invoice Number to get Invoice." | |
session_state.agent_history['get_invoice_agent'].append({"role": "assistant", "content": response}) | |
return response | |
with open(os.path.join(ROOT_DIR,str(os.getenv('INVOICE_DETAILS_PATH'))),"r") as fp: | |
invoice_data = json.load(fp=fp) | |
if invoice_num in invoice_data: | |
data = invoice_data[invoice_num] | |
# Extract the necessary information from the JSON data | |
date = data["Date"] | |
time = data["Time"] | |
company_name = data["Company Name"] | |
items = data["items"] | |
# Create the HTML string | |
part_1 = f""" | |
<html> | |
<head> | |
<style> | |
.invoice-container {{ | |
width: 100%; | |
padding: 20px; | |
}} | |
.invoice-header {{ | |
text-align: center; | |
margin-bottom: 20px; | |
}} | |
.invoice-header h1 {{ | |
margin: 0; | |
}} | |
.invoice-details, .invoice-footer {{ | |
margin-bottom: 20px; | |
}} | |
.invoice-items {{ | |
display: flex; | |
flex-direction: column; | |
}} | |
.invoice-item {{ | |
display: grid; | |
grid-template-columns: 1fr 1fr 1fr 1fr; | |
align-content: center; | |
padding: 10px; | |
border-bottom: 1px solid #ddd; | |
}} | |
.item-header {{ | |
font-weight: bold; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="invoice-container"> | |
<div class="invoice-header"> | |
<h1>INVOICE</h1> | |
<p><strong>Date:</strong> {date}</p> | |
<p><strong>Time:</strong> {time}</p> | |
<p><strong>Company Name:</strong> {company_name}</p> | |
</div> | |
<div class="invoice-details"> | |
<div class="invoice-items"> | |
<div class="invoice-item item-header"> | |
<div>Category</div> | |
<div>Quality</div> | |
<div>Quantity</div> | |
<div>Price per Unit</div> | |
</div> | |
""" | |
part_2 = "" | |
for item in items: | |
category = item["Category"] | |
quality = item["Qulity"] | |
quantity = item["Quntity"] | |
price_per_unit = item["price_per_unit"] | |
part_2 += ( | |
f"<div class='invoice-item'>" | |
f"<div>{category}</div>" | |
f"<div>{quality}</div>" | |
f"<div>{quantity}</div>" | |
f"<div>${price_per_unit}</div>" | |
f"</div>" | |
) | |
part_3 = """ | |
</div> | |
</div> | |
<div class="invoice-footer"> | |
<p><strong>Thank you for recycling tyres with us!</strong></p> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
# Display the HTML invoice using Streamlit | |
session_state.agent_history['get_invoice_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return f"""{part_1} | |
{part_2} | |
{part_3}""" | |
else: | |
response = f"We couldn't find any invoice with invoice number ***{invoice_num}***. Please enter valid invoice number." | |
session_state.agent_history['get_invoice_agent'].append({"role": "assistant", "content": response}) | |
return response | |
except Exception as error: | |
print(error) | |
return error | |
def price_estimation(inp = None) -> str: | |
"""this function can be used to estimate price of the tyre. | |
Returns: | |
string: response which contains estimated price of tyre. | |
""" | |
try: | |
session_state = session_manager.get_session_state() | |
session_state.last_tool = "price-estimation-tool" | |
if "price_estimation_agent" in session_state.agent_history: | |
session_state.agent_history['price_estimation_agent'].append({"role": "user", "content": session_state.last_query}) | |
else: | |
session_state.agent_history['price_estimation_agent'] = [{"role": "user", "content": session_state.last_query}] | |
session_state.next_agent = "price_estimation_agent" | |
is_valid_file = Path("Tyre.png").is_file() | |
category = None | |
for message in reversed(session_state.agent_history['price_estimation_agent']): | |
if message['role'] == 'user': | |
for word in ['bike', 'car', 'truck']: | |
if word in message['content'].lower(): | |
category = word | |
break | |
if not is_valid_file and not category: | |
response = "Please provide valid image of tyre and category of tyre. Category of tyre must be from car, bike or truck. Let me know once you uploaded a image of tyre." | |
session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
return response | |
elif not category: | |
response = "Please provide valid category of tyre. Category of tyre must be from car, bike or truck." | |
session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
return response | |
elif not is_valid_file: | |
response = "Plesase provide a image of tyre and let me know once you upload an image." | |
session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
return response | |
tyre_quality = predict_class(os.path.join(ROOT_DIR,str(os.getenv('QULITY_PREDICT_MODEL_PATH'))), 'Tyre.png') | |
print(tyre_quality) | |
tyre_price = waste_tyre_price(tyre_quality, category.lower()) | |
print(tyre_price) | |
session_state.agent_history['price_estimation_agent'] = [] | |
session_state.next_agent = "general_agent" | |
return f"Your tyre quality is {tyre_quality} and estimated price of your tyre is {tyre_price}" | |
except Exception as error: | |
print(error) | |
return error | |
def cancle_ongoing_process(inp = None) -> str: | |
"""This function can be used to cancle any ongoing process. | |
Returns: | |
str: response after canclelation. | |
""" | |
session_state = session_manager.get_session_state() | |
cancle_response = "" | |
if session_state.next_agent != "general_agent": | |
session_state.next_agent = "general_agent" | |
cancle_response = "Your ongoing process got cancleled." | |
else: | |
cancle_response = "You don't have any ongoing process to cancle." | |
session_state.agent_history = {} | |
return cancle_response | |