Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import datetime | |
| from typing import Dict, List, Union, Optional | |
| import random | |
| from huggingface_hub import InferenceClient | |
| import logging | |
| import json | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class TravelPlanner: | |
| def __init__(self): | |
| self.accommodation_types = { | |
| 'luxury': (200, 500), | |
| 'mid_range': (100, 200), | |
| 'budget': (50, 100), | |
| 'hostel': (20, 50) | |
| } | |
| self.activity_costs = { | |
| 'sightseeing': (20, 50), | |
| 'museum': (15, 25), | |
| 'adventure_sport': (50, 200), | |
| 'local_tour': (30, 80), | |
| 'cultural_experience': (25, 60) | |
| } | |
| self.meal_costs = { | |
| 'luxury': (50, 100), | |
| 'mid_range': (20, 50), | |
| 'budget': (10, 20), | |
| 'street_food': (5, 10) | |
| } | |
| def validate_inputs(self, | |
| destination: str, | |
| num_days: int, | |
| budget_level: str, | |
| num_people: int) -> tuple[bool, str]: | |
| """ | |
| Validate input parameters | |
| """ | |
| if not destination or len(destination.strip()) == 0: | |
| return False, "Destination cannot be empty" | |
| if num_days < 1 or num_days > 30: | |
| return False, "Number of days must be between 1 and 30" | |
| if budget_level not in self.accommodation_types: | |
| return False, f"Budget level must be one of: {', '.join(self.accommodation_types.keys())}" | |
| if num_people < 1 or num_people > 10: | |
| return False, "Number of people must be between 1 and 10" | |
| return True, "" | |
| def generate_itinerary(self, | |
| destination: str, | |
| num_days: int, | |
| budget_level: str) -> List[Dict]: | |
| """ | |
| Generate a daily itinerary based on destination and budget level | |
| """ | |
| try: | |
| activities = [ | |
| 'Morning sightseeing', | |
| 'Museum visit', | |
| 'Local market tour', | |
| 'Cultural workshop', | |
| 'Historical site visit', | |
| 'Nature walk', | |
| 'Local neighborhood exploration', | |
| 'Evening entertainment' | |
| ] | |
| itinerary = [] | |
| for day in range(1, num_days + 1): | |
| # Ensure no duplicate activities in same day | |
| day_activities = random.sample(activities, 3) | |
| daily_schedule = { | |
| 'day': day, | |
| 'morning': day_activities[0], | |
| 'afternoon': day_activities[1], | |
| 'evening': day_activities[2], | |
| 'accommodation': f"{budget_level} accommodation" | |
| } | |
| itinerary.append(daily_schedule) | |
| return itinerary | |
| except Exception as e: | |
| logger.error(f"Error generating itinerary: {str(e)}") | |
| raise | |
| def calculate_budget(self, | |
| num_days: int, | |
| budget_level: str, | |
| num_people: int) -> Dict: | |
| """ | |
| Calculate estimated budget based on duration and comfort level | |
| """ | |
| try: | |
| accommodation_range = self.accommodation_types[budget_level] | |
| meal_range = self.meal_costs[budget_level] | |
| # Use median values for more stable estimates | |
| daily_accommodation = sum(accommodation_range) / 2 | |
| daily_meals = (sum(meal_range) / 2) * 3 # 3 meals per day | |
| daily_activities = 65 # Median activity cost | |
| daily_transport = 30 # Median transport cost | |
| total_accommodation = daily_accommodation * num_days | |
| total_meals = daily_meals * num_days * num_people | |
| total_activities = daily_activities * num_days * num_people | |
| total_transport = daily_transport * num_days * num_people | |
| # Add 15% buffer for unexpected expenses | |
| subtotal = total_accommodation + total_meals + total_activities + total_transport | |
| buffer = subtotal * 0.15 | |
| total_cost = subtotal + buffer | |
| return { | |
| 'accommodation': round(total_accommodation, 2), | |
| 'meals': round(total_meals, 2), | |
| 'activities': round(total_activities, 2), | |
| 'transport': round(total_transport, 2), | |
| 'buffer': round(buffer, 2), | |
| 'total': round(total_cost, 2), | |
| 'per_person': round(total_cost / num_people, 2) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating budget: {str(e)}") | |
| raise | |
| def format_output(self, | |
| destination: str, | |
| itinerary: List[Dict], | |
| budget: Dict) -> str: | |
| """ | |
| Format the itinerary and budget into a readable string | |
| """ | |
| try: | |
| output = [f"📍 Travel Plan for {destination}", ""] | |
| output.append("🗓️ ITINERARY") | |
| output.append("=" * 20) | |
| for day in itinerary: | |
| output.extend([ | |
| f"Day {day['day']}:", | |
| f"🌅 Morning: {day['morning']}", | |
| f"☀️ Afternoon: {day['afternoon']}", | |
| f"🌙 Evening: {day['evening']}", | |
| f"🏠 Accommodation: {day['accommodation']}", | |
| "" | |
| ]) | |
| output.extend([ | |
| "💰 BUDGET BREAKDOWN", | |
| "=" * 20, | |
| f"🏨 Accommodation: ${budget['accommodation']:,.2f}", | |
| f"🍽️ Meals: ${budget['meals']:,.2f}", | |
| f"🎫 Activities: ${budget['activities']:,.2f}", | |
| f"🚌 Local Transport: ${budget['transport']:,.2f}", | |
| f"⚠️ Buffer (15%): ${budget['buffer']:,.2f}", | |
| "=" * 20, | |
| f"💵 Total Cost: ${budget['total']:,.2f}", | |
| f"👥 Cost per person: ${budget['per_person']:,.2f}" | |
| ]) | |
| return "\n".join(output) | |
| except Exception as e: | |
| logger.error(f"Error formatting output: {str(e)}") | |
| return "Error formatting travel plan. Please try again." | |
| def parse_travel_request(message: str) -> Optional[Dict]: | |
| """ | |
| Parse travel request message and extract parameters | |
| """ | |
| try: | |
| message = message.lower() | |
| if "plan a trip" not in message: | |
| return None | |
| parts = message.split() | |
| destination_idx = parts.index("to") + 1 | |
| days_idx = parts.index("days") - 1 | |
| budget_idx = parts.index("budget") - 1 | |
| people_idx = parts.index("people") - 1 | |
| return { | |
| "destination": parts[destination_idx].capitalize(), | |
| "num_days": int(parts[days_idx]), | |
| "budget_level": parts[budget_idx], | |
| "num_people": int(parts[people_idx]) | |
| } | |
| except (ValueError, IndexError) as e: | |
| logger.warning(f"Error parsing travel request: {str(e)}") | |
| return None | |
| def respond( | |
| message: str, | |
| history: List[Dict[str, str]], | |
| system_message: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float | |
| ) -> str: | |
| """ | |
| Process chat message and generate travel plan if requested | |
| """ | |
| try: | |
| # Check if this is a travel planning request | |
| travel_params = parse_travel_request(message) | |
| if travel_params: | |
| planner = TravelPlanner() | |
| # Validate inputs | |
| is_valid, error_msg = planner.validate_inputs( | |
| travel_params["destination"], | |
| travel_params["num_days"], | |
| travel_params["budget_level"], | |
| travel_params["num_people"] | |
| ) | |
| if not is_valid: | |
| return f"Error: {error_msg}\n\nPlease use the format: 'Plan a trip to [destination] for [X] days, [budget_level] budget, [X] people'" | |
| try: | |
| # Generate travel plan | |
| itinerary = planner.generate_itinerary( | |
| travel_params["destination"], | |
| travel_params["num_days"], | |
| travel_params["budget_level"] | |
| ) | |
| budget = planner.calculate_budget( | |
| travel_params["num_days"], | |
| travel_params["budget_level"], | |
| travel_params["num_people"] | |
| ) | |
| return planner.format_output( | |
| travel_params["destination"], | |
| itinerary, | |
| budget | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating travel plan: {str(e)}") | |
| return "Sorry, there was an error generating your travel plan. Please try again." | |
| # If not a travel request, use the Hugging Face model | |
| try: | |
| client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| messages = [{"role": "system", "content": system_message}] | |
| for msg in history: | |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
| messages.append(msg) | |
| messages.append({"role": "user", "content": message}) | |
| response = "" | |
| for token in client.chat_completion( | |
| messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ): | |
| if hasattr(token.choices[0].delta, 'content'): | |
| response += token.choices[0].delta.content or "" | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error with chat model: {str(e)}") | |
| return "I apologize, but I'm having trouble connecting to the chat service. You can still use me for travel planning by saying 'Plan a trip to [destination]...'" | |
| except Exception as e: | |
| logger.error(f"Unexpected error in respond function: {str(e)}") | |
| return "An unexpected error occurred. Please try again." | |
| # Create the Gradio interface | |
| demo = gr.ChatInterface( | |
| respond, | |
| additional_inputs=[ | |
| gr.Textbox( | |
| value="You are a travel planning assistant who can also chat about other topics.", | |
| label="System message" | |
| ), | |
| gr.Slider( | |
| minimum=1, | |
| maximum=2048, | |
| value=512, | |
| step=1, | |
| label="Max new tokens" | |
| ), | |
| gr.Slider( | |
| minimum=0.1, | |
| maximum=4.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ), | |
| gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-p (nucleus sampling)" | |
| ), | |
| ] | |
| ) | |
| # Launch the application | |
| if __name__ == "__main__": | |
| try: | |
| demo.launch() | |
| except Exception as e: | |
| logger.error(f"Error launching Gradio interface: {str(e)}") |