Spaces:
Runtime error
Runtime error
| """ | |
| Utility functions for Fetii AI Chatbot | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import re | |
| from typing import List, Dict, Any, Tuple, Optional | |
| import config | |
| def clean_location_name(location: str) -> str: | |
| """Clean and standardize location names.""" | |
| if pd.isna(location) or not location: | |
| return "Unknown" | |
| cleaned = location.strip().title() | |
| suffixes_to_remove = [", Austin, TX", ", Austin, Texas", ", USA", ", United States"] | |
| for suffix in suffixes_to_remove: | |
| if cleaned.endswith(suffix): | |
| cleaned = cleaned[:-len(suffix)] | |
| return cleaned | |
| def categorize_location(location: str) -> str: | |
| """Categorize location type based on keywords.""" | |
| location_lower = location.lower() | |
| for category, keywords in config.LOCATION_CATEGORIES.items(): | |
| if any(keyword in location_lower for keyword in keywords): | |
| return category.title() | |
| return "Other" | |
| def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: | |
| """Calculate approximate distance between two coordinates in kilometers.""" | |
| lat_diff = lat2 - lat1 | |
| lon_diff = lon2 - lon1 | |
| distance = np.sqrt(lat_diff**2 + lon_diff**2) * 111 | |
| return round(distance, 2) | |
| def format_time(hour: int) -> str: | |
| """Format hour as readable time string.""" | |
| if hour == 0: | |
| return "12:00 AM" | |
| elif hour < 12: | |
| return f"{hour}:00 AM" | |
| elif hour == 12: | |
| return "12:00 PM" | |
| else: | |
| return f"{hour-12}:00 PM" | |
| def get_time_category(hour: int) -> str: | |
| """Get time category for a given hour.""" | |
| for category, (start, end) in config.TIME_CATEGORIES.items(): | |
| if start <= hour < end: | |
| return category.replace('_', ' ').title() | |
| return "Unknown" | |
| def get_group_size_category(passengers: int) -> str: | |
| """Get group size category for passenger count.""" | |
| for category, (min_size, max_size) in config.GROUP_SIZE_CATEGORIES.items(): | |
| if min_size <= passengers <= max_size: | |
| return category.replace('_', ' ').title() | |
| return "Unknown" | |
| def extract_numbers_from_text(text: str) -> List[int]: | |
| """Extract all numbers from text.""" | |
| numbers = re.findall(r'\d+', text) | |
| return [int(num) for num in numbers] | |
| def parse_date_string(date_str: str) -> Optional[datetime]: | |
| """Parse various date string formats.""" | |
| formats = [ | |
| '%m/%d/%y %H:%M', | |
| '%m/%d/%Y %H:%M', | |
| '%Y-%m-%d %H:%M:%S', | |
| '%Y-%m-%d %H:%M', | |
| '%m/%d/%y %H:%M:%S' | |
| ] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(date_str, fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def generate_insights(data: pd.DataFrame) -> Dict[str, Any]: | |
| """Generate comprehensive insights from trip data.""" | |
| insights = {} | |
| insights['total_trips'] = len(data) | |
| insights['total_passengers'] = data['Total Passengers'].sum() | |
| insights['avg_group_size'] = data['Total Passengers'].mean() | |
| insights['median_group_size'] = data['Total Passengers'].median() | |
| if 'hour' in data.columns: | |
| insights['peak_hour'] = data['hour'].mode().iloc[0] if len(data['hour'].mode()) > 0 else None | |
| insights['hour_distribution'] = data['hour'].value_counts().to_dict() | |
| if 'pickup_main' in data.columns: | |
| insights['top_pickups'] = data['pickup_main'].value_counts().head(10).to_dict() | |
| insights['unique_pickup_locations'] = data['pickup_main'].nunique() | |
| if 'dropoff_main' in data.columns: | |
| insights['top_dropoffs'] = data['dropoff_main'].value_counts().head(10).to_dict() | |
| insights['unique_dropoff_locations'] = data['dropoff_main'].nunique() | |
| insights['group_size_distribution'] = data['Total Passengers'].value_counts().to_dict() | |
| insights['large_groups'] = len(data[data['Total Passengers'] >= config.ANALYSIS_THRESHOLDS['large_group_threshold']]) | |
| insights['large_groups_percentage'] = (insights['large_groups'] / insights['total_trips']) * 100 | |
| if 'date' in data.columns: | |
| insights['date_range'] = { | |
| 'start': data['date'].min(), | |
| 'end': data['date'].max(), | |
| 'days_covered': (data['date'].max() - data['date'].min()).days + 1 | |
| } | |
| insights['daily_average'] = insights['total_trips'] / insights['date_range']['days_covered'] | |
| return insights | |
| def format_number(number: float, decimals: int = 1) -> str: | |
| """Format numbers for display.""" | |
| if number >= 1000000: | |
| return f"{number/1000000:.{decimals}f}M" | |
| elif number >= 1000: | |
| return f"{number/1000:.{decimals}f}K" | |
| else: | |
| return f"{number:.{decimals}f}" if decimals > 0 else str(int(number)) | |
| def create_summary_stats(data: pd.DataFrame) -> Dict[str, str]: | |
| """Create formatted summary statistics for display.""" | |
| insights = generate_insights(data) | |
| return { | |
| 'Total Trips': format_number(insights['total_trips'], 0), | |
| 'Total Passengers': format_number(insights['total_passengers'], 0), | |
| 'Average Group Size': f"{insights['avg_group_size']:.1f}", | |
| 'Peak Hour': format_time(insights.get('peak_hour', 22)), | |
| 'Large Groups': f"{insights['large_groups_percentage']:.1f}%", | |
| 'Unique Pickup Locations': format_number(insights.get('unique_pickup_locations', 0), 0), | |
| 'Unique Destinations': format_number(insights.get('unique_dropoff_locations', 0), 0), | |
| 'Daily Average': f"{insights.get('daily_average', 0):.1f} trips/day" | |
| } | |
| def validate_data(data: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """Validate data quality and return issues found.""" | |
| issues = [] | |
| required_columns = ['Trip ID', 'Total Passengers', 'Trip Date and Time'] | |
| missing_columns = [col for col in required_columns if col not in data.columns] | |
| if missing_columns: | |
| issues.append(f"Missing required columns: {', '.join(missing_columns)}") | |
| if len(data) == 0: | |
| issues.append("Dataset is empty") | |
| return False, issues | |
| if 'Total Passengers' in data.columns: | |
| invalid_passengers = data[ | |
| (data['Total Passengers'] < 1) | | |
| (data['Total Passengers'] > 20) | | |
| (data['Total Passengers'].isna()) | |
| ] | |
| if len(invalid_passengers) > 0: | |
| issues.append(f"Found {len(invalid_passengers)} trips with invalid passenger counts") | |
| if 'Trip Date and Time' in data.columns: | |
| invalid_dates = 0 | |
| for date_str in data['Trip Date and Time'].dropna(): | |
| if parse_date_string(str(date_str)) is None: | |
| invalid_dates += 1 | |
| if invalid_dates > 0: | |
| issues.append(f"Found {invalid_dates} trips with invalid date formats") | |
| if 'Trip ID' in data.columns: | |
| duplicates = data['Trip ID'].duplicated().sum() | |
| if duplicates > 0: | |
| issues.append(f"Found {duplicates} duplicate trip IDs") | |
| return len(issues) == 0, issues | |
| def create_export_data(data: pd.DataFrame, insights: Dict[str, Any], format_type: str = 'csv') -> Any: | |
| """Create data for export in specified format.""" | |
| if format_type == 'csv': | |
| return data.to_csv(index=False) | |
| elif format_type == 'json': | |
| export_data = { | |
| 'metadata': { | |
| 'export_date': datetime.now().isoformat(), | |
| 'total_records': len(data), | |
| 'insights': insights | |
| }, | |
| 'data': data.to_dict('records') | |
| } | |
| return export_data | |
| elif format_type == 'summary': | |
| summary = create_summary_stats(data) | |
| return summary | |
| else: | |
| raise ValueError(f"Unsupported export format: {format_type}") | |
| def search_locations(query: str, locations: List[str], max_results: int = 5) -> List[str]: | |
| """Search for locations matching a query.""" | |
| query_lower = query.lower() | |
| matches = [] | |
| for location in locations: | |
| if query_lower == location.lower(): | |
| matches.append(location) | |
| for location in locations: | |
| if query_lower in location.lower() and location not in matches: | |
| matches.append(location) | |
| query_words = query_lower.split() | |
| for location in locations: | |
| location_lower = location.lower() | |
| if (any(word in location_lower for word in query_words) and | |
| location not in matches): | |
| matches.append(location) | |
| return matches[:max_results] | |
| def get_color_palette(num_colors: int) -> List[str]: | |
| """Get a color palette for visualizations.""" | |
| base_colors = [ | |
| '#667eea', '#764ba2', '#f093fb', '#f5576c', | |
| '#4facfe', '#00f2fe', '#43e97b', '#38f9d7', | |
| '#ffecd2', '#fcb69f', '#a8edea', '#fed6e3' | |
| ] | |
| if num_colors <= len(base_colors): | |
| return base_colors[:num_colors] | |
| import colorsys | |
| additional_colors = [] | |
| for i in range(num_colors - len(base_colors)): | |
| hue = (i * 0.618033988749895) % 1 | |
| rgb = colorsys.hsv_to_rgb(hue, 0.7, 0.9) | |
| hex_color = '#%02x%02x%02x' % tuple(int(c * 255) for c in rgb) | |
| additional_colors.append(hex_color) | |
| return base_colors + additional_colors |