import gradio as gr |
from gradio_multimodalchatbot import MultimodalChatbot |
from gradio.data_classes import FileData |
import os |
import pandas as pd |
import requests |
from PIL import Image, UnidentifiedImageError |
from io import BytesIO |
import matplotlib.pyplot as plt |
import urllib3 |
from transformers import pipeline |
from transformers import BitsAndBytesConfig |
import torch |
import textwrap |
import numpy as np |
from haversine import haversine |
from transformers import AutoProcessor, LlavaForConditionalGeneration |
from huggingface_hub import InferenceClient |
from transformers import AutoTokenizer |
from transformers import AutoImageProcessor |
from datasets import load_dataset |
from geopy.geocoders import Nominatim |
import pyarrow |
import spaces |
IS_SPACES_ZERO = os.environ.get("SPACES_ZERO_GPU", "0") == "1" |
IS_SPACE = os.environ.get("SPACE_ID", None) is not None |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
LOW_MEMORY = os.getenv("LOW_MEMORY", "0") == "1" |
MODEL_ID = "llava-hf/llava-1.5-7b-hf" |
TEXT_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" |
print(f"Using device: {DEVICE}") |
print(f"Low memory: {LOW_MEMORY}") |
quantization_config = BitsAndBytesConfig( |
load_in_4bit=True, |
bnb_4bit_compute_dtype=torch.float16 |
) |
tokenizer_image_to_text = AutoTokenizer.from_pretrained(MODEL_ID) |
image_processor = AutoImageProcessor.from_pretrained(MODEL_ID) |
processor = AutoProcessor.from_pretrained(MODEL_ID) |
model = LlavaForConditionalGeneration.from_pretrained(MODEL_ID, quantization_config=quantization_config, device_map="auto") |
pipe_image_to_text = pipeline("image-to-text", model=model, tokenizer=tokenizer_image_to_text, image_processor=image_processor, model_kwargs={"quantization_config": quantization_config}) |
pipe_text = pipeline( |
"text-generation", |
model=TEXT_MODEL_ID, |
model_kwargs={ |
"quantization_config": quantization_config, |
"use_auth_token": True |
} |
) |
current_directory = os.getcwd() |
geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') |
csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') |
if not os.path.isfile(geocoded_hotels_path): |
url = 'https://github.com/ruslanmv/watsonx-with-multimodal-llava/raw/master/geocoded_hotels.csv' |
response = requests.get(url) |
if response.status_code == 200: |
with open(geocoded_hotels_path, 'wb') as f: |
f.write(response.content) |
print(f"File {geocoded_hotels_path} downloaded successfully!") |
else: |
print(f"Error downloading file. Status code: {response.status_code}") |
else: |
print(f"File {geocoded_hotels_path} already exists.") |
geocoded_hotels = pd.read_csv(geocoded_hotels_path) |
if not os.path.exists(csv_file_path): |
dataset = load_dataset("ruslanmv/hotel-multimodal") |
df_hotels = dataset['train'].to_pandas() |
df_hotels.to_csv(csv_file_path, index=False) |
print("Dataset downloaded and saved as CSV.") |
else: |
df_hotels = pd.read_csv(csv_file_path) |
def get_current_location(): |
try: |
response = requests.get('https://ipinfo.io/json') |
data = response.json() |
location = data.get('loc', '') |
if location: |
return map(float, location.split(',')) |
else: |
return None, None |
except Exception as e: |
print(f"An error occurred: {e}") |
return None, None |
def get_coordinates(location_name): |
geolocator = Nominatim(user_agent="coordinate_finder") |
location = geolocator.geocode(location_name) |
if location: |
return location.latitude, location.longitude |
else: |
return None |
def find_nearby(place=None): |
if place: |
coordinates = get_coordinates(place) |
if coordinates: |
latitude, longitude = coordinates |
print(f"The coordinates of {place} are: Latitude: {latitude}, Longitude: {longitude}") |
else: |
print(f"Location not found: {place}") |
return None |
else: |
latitude, longitude = get_current_location() |
if not latitude or not longitude: |
print("Could not retrieve the current location.") |
return None |
geocoded_hotels['distance_km'] = geocoded_hotels.apply( |
lambda row: haversine((latitude, longitude), (row['latitude'], row['longitude'])), |
axis=1 |
) |
closest_hotels = geocoded_hotels.sort_values(by='distance_km').head(5) |
print("The 5 closest locations are:\n") |
print(closest_hotels) |
return closest_hotels |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
@spaces.GPU |
def search_hotel(place=None): |
df_found = find_nearby(place) |
if df_found is None: |
return pd.DataFrame() |
df_found = df_found.head(1) |
hotel_ids = df_found["hotel_id"].values.tolist() |
filtered_df = df_hotels[df_hotels['hotel_id'].isin(hotel_ids)] |
filtered_df.loc[:, 'hotel_id'] = pd.Categorical(filtered_df['hotel_id'], categories=hotel_ids, ordered=True) |
filtered_df = filtered_df.sort_values('hotel_id').reset_index(drop=True) |
grouped_df = filtered_df.groupby('hotel_id', observed=True).head(1) |
description_data = [] |
for index, row in grouped_df.iterrows(): |
hotel_id = row['hotel_id'] |
hotel_name = row['hotel_name'] |
image_url = row['image_url'] |
try: |
response = requests.get(image_url, verify=False) |
response.raise_for_status() |
img = Image.open(BytesIO(response.content)) |
prompt = "USER: <image>\nAnalyze this image. Give me feedback on whether this hotel is worth visiting based on the picture. Provide a summary review.\nASSISTANT:" |
outputs = pipe_image_to_text(img, prompt=prompt, generate_kwargs={"max_new_tokens": 200}) |
description = outputs[0]["generated_text"].split("\nASSISTANT:")[-1].strip() |
description_data.append({'hotel_name': hotel_name, 'hotel_id': hotel_id, 'image': img, 'description': description}) |
except (requests.RequestException, UnidentifiedImageError): |
print(f"Skipping image at URL: {image_url}") |
return pd.DataFrame(description_data) |
def show_hotels(place=None): |
description_df = search_hotel(place) |
if description_df.empty: |
print("No hotels found.") |
return |
num_images = len(description_df) |
num_rows = (num_images + 1) // 2 |
fig, axs = plt.subplots(num_rows * 2, 2, figsize=(20, 10 * num_rows)) |
current_index = 0 |
for _, row in description_df.iterrows(): |
img = row['image'] |
description = row['description'] |
if img is None: |
continue |
row_idx = (current_index // 2) * 2 |
col_idx = current_index % 2 |
axs[row_idx, col_idx].imshow(img) |
axs[row_idx, col_idx].axis('off') |
axs[row_idx, col_idx].set_title(f"{row['hotel_name']}\nHotel ID: {row['hotel_id']} Image {current_index + 1}", fontsize=16) |
wrapped_description = "\n".join(textwrap.wrap(description, width=50)) |
axs[row_idx + 1, col_idx].text(0.5, 0.5, wrapped_description, ha='center', va='center', wrap=True, fontsize=14) |
axs[row_idx + 1, col_idx].axis('off') |
current_index += 1 |
plt.tight_layout() |
plt.show() |
def grouped_description(description_df): |
grouped_descriptions = description_df.groupby('hotel_id')['description'].apply(lambda x: ' '.join(x.astype(str))).reset_index() |
result_df = pd.merge(grouped_descriptions, description_df[['hotel_id', 'hotel_name']], on='hotel_id', how='left') |
result_df = result_df.drop_duplicates(subset='hotel_id', keep='first') |
result_df = result_df[['hotel_name', 'hotel_id', 'description']] |
return result_df |
def create_prompt_result(result_df): |
prompt = "" |
for _, row in result_df.iterrows(): |
hotel_name = row['hotel_name'] |
hotel_id = row['hotel_id'] |
description = row['description'] |
prompt += f"Hotel Name: {hotel_name}\nHotel ID: {hotel_id}\nDescription: {description}\n\n" |
return prompt |
def build_prompt(context_result): |
hotel_recommendation_template = """ |
<s>[INST] <<SYS>> |
You are a helpful and informative chatbot assistant. |
<</SYS>> |
Based on the following hotel descriptions, recommend the best hotel: |
{context_result} |
[/INST] |
""" |
return hotel_recommendation_template.format(context_result=context_result) |
@spaces.GPU |
def generate_text_response(prompt): |
outputs = pipe_text(prompt, max_new_tokens=500) |
response = outputs[0]['generated_text'].split("[/INST]")[-1].strip() |
return response |
def multimodal_results(description_df): |
conversation = [] |
for _, row in description_df.iterrows(): |
hotel_name = row['hotel_name'] |
description = row['description'] |
img = row['image'] |
img_path = f"{hotel_name}.png" |
img.save(img_path) |
bot_msg = { |
"text": f"Here is {hotel_name}. {description}", |
"files": [{"file": FileData(path=img_path)}] |
} |
conversation.append([{"text": "", "files": []}, bot_msg]) |
return conversation |
def llm_results(description_df): |
result_df = grouped_description(description_df) |
context_result = create_prompt_result(result_df) |
recommendation_prompt = build_prompt(context_result) |
result = generate_text_response(recommendation_prompt) |
conversation = [[{"text": "Based on your search...", "files": []}, {"text": f"**My recommendation:** {result}", "files": []}]] |
return conversation |
def chatbot_response(user_input, conversation): |
bot_initial_message = { |
"text": f"Looking for hotels in {user_input}...", |
"files": [] |
} |
conversation.append([{"text": user_input, "files": []}, bot_initial_message]) |
yield conversation |
description_df = search_hotel(user_input) |
if description_df is None or description_df.empty: |
error_message = {"text": f"Sorry, I couldn't find any hotels for {user_input}. Please try another location.", "files": []} |
conversation.append([{"text": user_input, "files": []}, error_message]) |
yield conversation |
return |
hotel_conversation = multimodal_results(description_df) |
for message_pair in hotel_conversation: |
conversation.append(message_pair) |
yield conversation |
final_recommendation = llm_results(description_df) |
for message_pair in final_recommendation: |
conversation.append(message_pair) |
yield conversation |
def initial_conversation(): |
return [[ |
{"text": "**Welcome to Hotel Recommendation!**", "files": []}, |
{"text": "Please enter the place you're interested in visiting.", "files": []} |
]] |
with gr.Blocks() as demo: |
gr.Markdown("# 🏨 Hotel Recommendation Chatbot") |
gr.Markdown("**Provide the location to discover hotels and receive personalized recommendations!**") |
initial_conv = initial_conversation() |
chatbot = MultimodalChatbot(value=initial_conv, height=500) |
with gr.Row(): |
place_input = gr.Textbox(label="Enter a place", placeholder="E.g., Paris France, Tokyo Japan, Genova Italy") |
send_btn = gr.Button("Search Hotels") |
send_btn.click(chatbot_response, inputs=[place_input, chatbot], outputs=chatbot) |
demo.launch(debug=True) |