import gradio as gr from gradio_multimodalchatbot import MultimodalChatbot from gradio.data_classes import FileData import os import pandas as pd import requests from PIL import Image, UnidentifiedImageError from io import BytesIO import matplotlib.pyplot as plt import urllib3 from transformers import pipeline from transformers import BitsAndBytesConfig import torch import textwrap import pandas as pd import numpy as np from haversine import haversine # Install haversine library: pip install haversine from transformers import AutoProcessor, LlavaForConditionalGeneration from transformers import BitsAndBytesConfig import torch from huggingface_hub import InferenceClient from transformers import AutoTokenizer from transformers import AutoImageProcessor from datasets import load_dataset from geopy.geocoders import Nominatim import pyarrow import spaces IS_SPACES_ZERO = os.environ.get("SPACES_ZERO_GPU", "0") == "1" IS_SPACE = os.environ.get("SPACE_ID", None) is not None # Constants DEVICE = "cuda" if torch.cuda.is_available() else "cpu" LOW_MEMORY = os.getenv("LOW_MEMORY", "0") == "1" MODEL_ID = "llava-hf/llava-1.5-7b-hf" TEXT_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" # Print device and memory info print(f"Using device: {DEVICE}") print(f"Low memory: {LOW_MEMORY}") # Quantization configuration for efficient model loading # Define BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) # Load the tokenizer associated with your 'MODEL_ID' tokenizer_image_to_text = AutoTokenizer.from_pretrained(MODEL_ID) # Load the image processor associated with your 'MODEL_ID' image_processor = AutoImageProcessor.from_pretrained(MODEL_ID) # Load models only once processor = AutoProcessor.from_pretrained(MODEL_ID) model = LlavaForConditionalGeneration.from_pretrained(MODEL_ID, quantization_config=quantization_config, device_map="auto") # Pass the tokenizer, image processor explicitly to the pipeline pipe_image_to_text = pipeline("image-to-text", model=model, tokenizer=tokenizer_image_to_text, image_processor=image_processor, model_kwargs={"quantization_config": quantization_config}) # Initialize the text generation pipeline pipe_text = pipeline( "text-generation", model=TEXT_MODEL_ID, model_kwargs={ "quantization_config": quantization_config, "use_auth_token": True # This will use the environment variable } ) # Ensure data files are available current_directory = os.getcwd() geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') # Load geocoded hotels data if not os.path.isfile(geocoded_hotels_path): url = 'https://github.com/ruslanmv/watsonx-with-multimodal-llava/raw/master/geocoded_hotels.csv' response = requests.get(url) if response.status_code == 200: with open(geocoded_hotels_path, 'wb') as f: f.write(response.content) print(f"File {geocoded_hotels_path} downloaded successfully!") else: print(f"Error downloading file. Status code: {response.status_code}") else: print(f"File {geocoded_hotels_path} already exists.") geocoded_hotels = pd.read_csv(geocoded_hotels_path) # Load hotel dataset if not os.path.exists(csv_file_path): dataset = load_dataset("ruslanmv/hotel-multimodal") df_hotels = dataset['train'].to_pandas() df_hotels.to_csv(csv_file_path, index=False) print("Dataset downloaded and saved as CSV.") else: df_hotels = pd.read_csv(csv_file_path) def get_current_location(): try: response = requests.get('https://ipinfo.io/json') data = response.json() location = data.get('loc', '') if location: return map(float, location.split(',')) else: return None, None except Exception as e: print(f"An error occurred: {e}") return None, None def get_coordinates(location_name): geolocator = Nominatim(user_agent="coordinate_finder") location = geolocator.geocode(location_name) if location: return location.latitude, location.longitude else: return None def find_nearby(place=None): if place: coordinates = get_coordinates(place) if coordinates: latitude, longitude = coordinates print(f"The coordinates of {place} are: Latitude: {latitude}, Longitude: {longitude}") else: print(f"Location not found: {place}") return None else: latitude, longitude = get_current_location() if not latitude or not longitude: print("Could not retrieve the current location.") return None geocoded_hotels['distance_km'] = geocoded_hotels.apply( lambda row: haversine((latitude, longitude), (row['latitude'], row['longitude'])), axis=1 ) closest_hotels = geocoded_hotels.sort_values(by='distance_km').head(5) print("The 5 closest locations are:\n") print(closest_hotels) return closest_hotels # Suppress InsecureRequestWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @spaces.GPU # Define the respond function def search_hotel(place=None): df_found = find_nearby(place) if df_found is None: return pd.DataFrame() df_found = df_found.head(1) # Only last 1 hotels, to save runtime of Hugging Face ZERO GPU hotel_ids = df_found["hotel_id"].values.tolist() filtered_df = df_hotels[df_hotels['hotel_id'].isin(hotel_ids)] # Use .loc[] to avoid SettingWithCopyWarning filtered_df.loc[:, 'hotel_id'] = pd.Categorical(filtered_df['hotel_id'], categories=hotel_ids, ordered=True) filtered_df = filtered_df.sort_values('hotel_id').reset_index(drop=True) grouped_df = filtered_df.groupby('hotel_id', observed=True).head(1) description_data = [] for index, row in grouped_df.iterrows(): hotel_id = row['hotel_id'] hotel_name = row['hotel_name'] image_url = row['image_url'] try: response = requests.get(image_url, verify=False) response.raise_for_status() img = Image.open(BytesIO(response.content)) prompt = "USER: \nAnalyze this image. Give me feedback on whether this hotel is worth visiting based on the picture. Provide a summary review.\nASSISTANT:" outputs = pipe_image_to_text(img, prompt=prompt, generate_kwargs={"max_new_tokens": 200}) description = outputs[0]["generated_text"].split("\nASSISTANT:")[-1].strip() description_data.append({'hotel_name': hotel_name, 'hotel_id': hotel_id, 'image': img, 'description': description}) except (requests.RequestException, UnidentifiedImageError): print(f"Skipping image at URL: {image_url}") return pd.DataFrame(description_data) def show_hotels(place=None): description_df = search_hotel(place) if description_df.empty: print("No hotels found.") return num_images = len(description_df) num_rows = (num_images + 1) // 2 fig, axs = plt.subplots(num_rows * 2, 2, figsize=(20, 10 * num_rows)) current_index = 0 for _, row in description_df.iterrows(): img = row['image'] description = row['description'] if img is None: continue row_idx = (current_index // 2) * 2 col_idx = current_index % 2 axs[row_idx, col_idx].imshow(img) axs[row_idx, col_idx].axis('off') axs[row_idx, col_idx].set_title(f"{row['hotel_name']}\nHotel ID: {row['hotel_id']} Image {current_index + 1}", fontsize=16) wrapped_description = "\n".join(textwrap.wrap(description, width=50)) axs[row_idx + 1, col_idx].text(0.5, 0.5, wrapped_description, ha='center', va='center', wrap=True, fontsize=14) axs[row_idx + 1, col_idx].axis('off') current_index += 1 plt.tight_layout() plt.show() def grouped_description(description_df): grouped_descriptions = description_df.groupby('hotel_id')['description'].apply(lambda x: ' '.join(x.astype(str))).reset_index() result_df = pd.merge(grouped_descriptions, description_df[['hotel_id', 'hotel_name']], on='hotel_id', how='left') result_df = result_df.drop_duplicates(subset='hotel_id', keep='first') result_df = result_df[['hotel_name', 'hotel_id', 'description']] return result_df def create_prompt_result(result_df): prompt = "" for _, row in result_df.iterrows(): hotel_name = row['hotel_name'] hotel_id = row['hotel_id'] description = row['description'] prompt += f"Hotel Name: {hotel_name}\nHotel ID: {hotel_id}\nDescription: {description}\n\n" return prompt def build_prompt(context_result): hotel_recommendation_template = """ [INST] <> You are a helpful and informative chatbot assistant. <> Based on the following hotel descriptions, recommend the best hotel: {context_result} [/INST] """ return hotel_recommendation_template.format(context_result=context_result) @spaces.GPU # Define the respond function def generate_text_response(prompt): outputs = pipe_text(prompt, max_new_tokens=500) response = outputs[0]['generated_text'].split("[/INST]")[-1].strip() return response def multimodal_results(description_df): conversation = [] for _, row in description_df.iterrows(): hotel_name = row['hotel_name'] description = row['description'] img = row['image'] img_path = f"{hotel_name}.png" img.save(img_path) bot_msg = { "text": f"Here is {hotel_name}. {description}", "files": [{"file": FileData(path=img_path)}] } conversation.append([{"text": "", "files": []}, bot_msg]) return conversation def llm_results(description_df): result_df = grouped_description(description_df) context_result = create_prompt_result(result_df) recommendation_prompt = build_prompt(context_result) result = generate_text_response(recommendation_prompt) conversation = [[{"text": "Based on your search...", "files": []}, {"text": f"**My recommendation:** {result}", "files": []}]] return conversation def chatbot_response(user_input, conversation): bot_initial_message = { "text": f"Looking for hotels in {user_input}...", "files": [] } conversation.append([{"text": user_input, "files": []}, bot_initial_message]) yield conversation description_df = search_hotel(user_input) if description_df is None or description_df.empty: error_message = {"text": f"Sorry, I couldn't find any hotels for {user_input}. Please try another location.", "files": []} conversation.append([{"text": user_input, "files": []}, error_message]) yield conversation return # Exit the function early hotel_conversation = multimodal_results(description_df) for message_pair in hotel_conversation: conversation.append(message_pair) yield conversation final_recommendation = llm_results(description_df) for message_pair in final_recommendation: conversation.append(message_pair) yield conversation def initial_conversation(): return [[ {"text": "**Welcome to Hotel Recommendation!**", "files": []}, {"text": "Please enter the place you're interested in visiting.", "files": []} ]] with gr.Blocks() as demo: gr.Markdown("# 🏨 Hotel Recommendation Chatbot") gr.Markdown("**Provide the location to discover hotels and receive personalized recommendations!**") initial_conv = initial_conversation() chatbot = MultimodalChatbot(value=initial_conv, height=500) with gr.Row(): place_input = gr.Textbox(label="Enter a place", placeholder="E.g., Paris France, Tokyo Japan, Genova Italy") send_btn = gr.Button("Search Hotels") send_btn.click(chatbot_response, inputs=[place_input, chatbot], outputs=chatbot) demo.launch(debug=True)