|
import gradio as gr |
|
from gradio_multimodalchatbot import MultimodalChatbot |
|
from gradio.data_classes import FileData |
|
import os |
|
import pandas as pd |
|
import requests |
|
from PIL import Image, UnidentifiedImageError |
|
from io import BytesIO |
|
import matplotlib.pyplot as plt |
|
import urllib3 |
|
from transformers import pipeline |
|
from transformers import BitsAndBytesConfig |
|
import torch |
|
import textwrap |
|
import pandas as pd |
|
import numpy as np |
|
from haversine import haversine |
|
from transformers import AutoProcessor, LlavaForConditionalGeneration |
|
from transformers import BitsAndBytesConfig |
|
import torch |
|
from huggingface_hub import InferenceClient |
|
from transformers import AutoTokenizer |
|
from transformers import AutoImageProcessor |
|
from datasets import load_dataset |
|
from geopy.geocoders import Nominatim |
|
import pyarrow |
|
import spaces |
|
IS_SPACES_ZERO = os.environ.get("SPACES_ZERO_GPU", "0") == "1" |
|
IS_SPACE = os.environ.get("SPACE_ID", None) is not None |
|
|
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
LOW_MEMORY = os.getenv("LOW_MEMORY", "0") == "1" |
|
MODEL_ID = "llava-hf/llava-1.5-7b-hf" |
|
TEXT_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" |
|
|
|
|
|
print(f"Using device: {DEVICE}") |
|
print(f"Low memory: {LOW_MEMORY}") |
|
|
|
|
|
|
|
quantization_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_compute_dtype=torch.float16 |
|
) |
|
|
|
|
|
tokenizer_image_to_text = AutoTokenizer.from_pretrained(MODEL_ID) |
|
|
|
image_processor = AutoImageProcessor.from_pretrained(MODEL_ID) |
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_ID) |
|
model = LlavaForConditionalGeneration.from_pretrained(MODEL_ID, quantization_config=quantization_config, device_map="auto") |
|
|
|
pipe_image_to_text = pipeline("image-to-text", model=model, tokenizer=tokenizer_image_to_text, image_processor=image_processor, model_kwargs={"quantization_config": quantization_config}) |
|
|
|
|
|
pipe_text = pipeline( |
|
"text-generation", |
|
model=TEXT_MODEL_ID, |
|
model_kwargs={ |
|
"quantization_config": quantization_config, |
|
"use_auth_token": True |
|
} |
|
) |
|
|
|
current_directory = os.getcwd() |
|
geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') |
|
csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') |
|
|
|
|
|
if not os.path.isfile(geocoded_hotels_path): |
|
url = 'https://github.com/ruslanmv/watsonx-with-multimodal-llava/raw/master/geocoded_hotels.csv' |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
with open(geocoded_hotels_path, 'wb') as f: |
|
f.write(response.content) |
|
print(f"File {geocoded_hotels_path} downloaded successfully!") |
|
else: |
|
print(f"Error downloading file. Status code: {response.status_code}") |
|
else: |
|
print(f"File {geocoded_hotels_path} already exists.") |
|
geocoded_hotels = pd.read_csv(geocoded_hotels_path) |
|
|
|
|
|
if not os.path.exists(csv_file_path): |
|
dataset = load_dataset("ruslanmv/hotel-multimodal") |
|
df_hotels = dataset['train'].to_pandas() |
|
df_hotels.to_csv(csv_file_path, index=False) |
|
print("Dataset downloaded and saved as CSV.") |
|
else: |
|
df_hotels = pd.read_csv(csv_file_path) |
|
|
|
def get_current_location(): |
|
try: |
|
response = requests.get('https://ipinfo.io/json') |
|
data = response.json() |
|
location = data.get('loc', '') |
|
if location: |
|
return map(float, location.split(',')) |
|
else: |
|
return None, None |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
return None, None |
|
|
|
def get_coordinates(location_name): |
|
geolocator = Nominatim(user_agent="coordinate_finder") |
|
location = geolocator.geocode(location_name) |
|
if location: |
|
return location.latitude, location.longitude |
|
else: |
|
return None |
|
|
|
def find_nearby(place=None): |
|
if place: |
|
coordinates = get_coordinates(place) |
|
if coordinates: |
|
latitude, longitude = coordinates |
|
print(f"The coordinates of {place} are: Latitude: {latitude}, Longitude: {longitude}") |
|
else: |
|
print(f"Location not found: {place}") |
|
return None |
|
else: |
|
latitude, longitude = get_current_location() |
|
if not latitude or not longitude: |
|
print("Could not retrieve the current location.") |
|
return None |
|
|
|
geocoded_hotels['distance_km'] = geocoded_hotels.apply( |
|
lambda row: haversine((latitude, longitude), (row['latitude'], row['longitude'])), |
|
axis=1 |
|
) |
|
|
|
closest_hotels = geocoded_hotels.sort_values(by='distance_km').head(5) |
|
print("The 5 closest locations are:\n") |
|
print(closest_hotels) |
|
return closest_hotels |
|
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
@spaces.GPU |
|
|
|
def search_hotel(place=None): |
|
df_found = find_nearby(place) |
|
if df_found is None: |
|
return pd.DataFrame() |
|
|
|
df_found = df_found.head(1) |
|
hotel_ids = df_found["hotel_id"].values.tolist() |
|
filtered_df = df_hotels[df_hotels['hotel_id'].isin(hotel_ids)] |
|
|
|
|
|
filtered_df.loc[:, 'hotel_id'] = pd.Categorical(filtered_df['hotel_id'], categories=hotel_ids, ordered=True) |
|
filtered_df = filtered_df.sort_values('hotel_id').reset_index(drop=True) |
|
grouped_df = filtered_df.groupby('hotel_id', observed=True).head(1) |
|
description_data = [] |
|
|
|
for index, row in grouped_df.iterrows(): |
|
hotel_id = row['hotel_id'] |
|
hotel_name = row['hotel_name'] |
|
image_url = row['image_url'] |
|
|
|
try: |
|
response = requests.get(image_url, verify=False) |
|
response.raise_for_status() |
|
img = Image.open(BytesIO(response.content)) |
|
prompt = "USER: <image>\nAnalyze this image. Give me feedback on whether this hotel is worth visiting based on the picture. Provide a summary review.\nASSISTANT:" |
|
outputs = pipe_image_to_text(img, prompt=prompt, generate_kwargs={"max_new_tokens": 200}) |
|
description = outputs[0]["generated_text"].split("\nASSISTANT:")[-1].strip() |
|
description_data.append({'hotel_name': hotel_name, 'hotel_id': hotel_id, 'image': img, 'description': description}) |
|
except (requests.RequestException, UnidentifiedImageError): |
|
print(f"Skipping image at URL: {image_url}") |
|
|
|
return pd.DataFrame(description_data) |
|
|
|
|
|
def show_hotels(place=None): |
|
description_df = search_hotel(place) |
|
if description_df.empty: |
|
print("No hotels found.") |
|
return |
|
num_images = len(description_df) |
|
num_rows = (num_images + 1) // 2 |
|
|
|
fig, axs = plt.subplots(num_rows * 2, 2, figsize=(20, 10 * num_rows)) |
|
|
|
current_index = 0 |
|
for _, row in description_df.iterrows(): |
|
img = row['image'] |
|
description = row['description'] |
|
|
|
if img is None: |
|
continue |
|
|
|
row_idx = (current_index // 2) * 2 |
|
col_idx = current_index % 2 |
|
|
|
axs[row_idx, col_idx].imshow(img) |
|
axs[row_idx, col_idx].axis('off') |
|
axs[row_idx, col_idx].set_title(f"{row['hotel_name']}\nHotel ID: {row['hotel_id']} Image {current_index + 1}", fontsize=16) |
|
|
|
wrapped_description = "\n".join(textwrap.wrap(description, width=50)) |
|
axs[row_idx + 1, col_idx].text(0.5, 0.5, wrapped_description, ha='center', va='center', wrap=True, fontsize=14) |
|
axs[row_idx + 1, col_idx].axis('off') |
|
|
|
current_index += 1 |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
def grouped_description(description_df): |
|
grouped_descriptions = description_df.groupby('hotel_id')['description'].apply(lambda x: ' '.join(x.astype(str))).reset_index() |
|
result_df = pd.merge(grouped_descriptions, description_df[['hotel_id', 'hotel_name']], on='hotel_id', how='left') |
|
result_df = result_df.drop_duplicates(subset='hotel_id', keep='first') |
|
result_df = result_df[['hotel_name', 'hotel_id', 'description']] |
|
return result_df |
|
|
|
def create_prompt_result(result_df): |
|
prompt = "" |
|
for _, row in result_df.iterrows(): |
|
hotel_name = row['hotel_name'] |
|
hotel_id = row['hotel_id'] |
|
description = row['description'] |
|
prompt += f"Hotel Name: {hotel_name}\nHotel ID: {hotel_id}\nDescription: {description}\n\n" |
|
return prompt |
|
|
|
def build_prompt(context_result): |
|
hotel_recommendation_template = """ |
|
<s>[INST] <<SYS>> |
|
You are a helpful and informative chatbot assistant. |
|
<</SYS>> |
|
Based on the following hotel descriptions, recommend the best hotel: |
|
{context_result} |
|
[/INST] |
|
""" |
|
return hotel_recommendation_template.format(context_result=context_result) |
|
@spaces.GPU |
|
|
|
def generate_text_response(prompt): |
|
outputs = pipe_text(prompt, max_new_tokens=500) |
|
response = outputs[0]['generated_text'].split("[/INST]")[-1].strip() |
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def multimodal_results(description_df): |
|
conversation = [] |
|
for _, row in description_df.iterrows(): |
|
hotel_name = row['hotel_name'] |
|
description = row['description'] |
|
img = row['image'] |
|
|
|
img_path = f"{hotel_name}.png" |
|
img.save(img_path) |
|
|
|
bot_msg = { |
|
"text": f"Here is {hotel_name}. {description}", |
|
"files": [{"file": FileData(path=img_path)}] |
|
} |
|
|
|
conversation.append([{"text": "", "files": []}, bot_msg]) |
|
|
|
return conversation |
|
|
|
def llm_results(description_df): |
|
result_df = grouped_description(description_df) |
|
context_result = create_prompt_result(result_df) |
|
recommendation_prompt = build_prompt(context_result) |
|
result = generate_text_response(recommendation_prompt) |
|
conversation = [[{"text": "Based on your search...", "files": []}, {"text": f"**My recommendation:** {result}", "files": []}]] |
|
return conversation |
|
|
|
def chatbot_response(user_input, conversation): |
|
bot_initial_message = { |
|
"text": f"Looking for hotels in {user_input}...", |
|
"files": [] |
|
} |
|
conversation.append([{"text": user_input, "files": []}, bot_initial_message]) |
|
|
|
yield conversation |
|
|
|
description_df = search_hotel(user_input) |
|
|
|
if description_df is None or description_df.empty: |
|
error_message = {"text": f"Sorry, I couldn't find any hotels for {user_input}. Please try another location.", "files": []} |
|
conversation.append([{"text": user_input, "files": []}, error_message]) |
|
yield conversation |
|
return |
|
|
|
hotel_conversation = multimodal_results(description_df) |
|
|
|
for message_pair in hotel_conversation: |
|
conversation.append(message_pair) |
|
yield conversation |
|
|
|
final_recommendation = llm_results(description_df) |
|
for message_pair in final_recommendation: |
|
conversation.append(message_pair) |
|
yield conversation |
|
|
|
|
|
def initial_conversation(): |
|
return [[ |
|
{"text": "**Welcome to Hotel Recommendation!**", "files": []}, |
|
{"text": "Please enter the place you're interested in visiting.", "files": []} |
|
]] |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# 🏨 Hotel Recommendation Chatbot") |
|
gr.Markdown("**Provide the location to discover hotels and receive personalized recommendations!**") |
|
|
|
initial_conv = initial_conversation() |
|
chatbot = MultimodalChatbot(value=initial_conv, height=500) |
|
|
|
with gr.Row(): |
|
place_input = gr.Textbox(label="Enter a place", placeholder="E.g., Paris France, Tokyo Japan, Genova Italy") |
|
send_btn = gr.Button("Search Hotels") |
|
|
|
send_btn.click(chatbot_response, inputs=[place_input, chatbot], outputs=chatbot) |
|
|
|
demo.launch(debug=True) |
|
|