Spaces:
Running
on
Zero
Running
on
Zero
| from datetime import datetime | |
| import json | |
| import os | |
| import pytz | |
| import traceback | |
| class UserHistoryManager: | |
| def __init__(self): | |
| """Initialize history record manager""" | |
| self.history_file = "user_history.json" | |
| print(f"Initializing UserHistoryManager with file: {os.path.abspath(self.history_file)}") | |
| self._init_file() | |
| def _init_file(self): | |
| """Initialize JSON file""" | |
| try: | |
| if not os.path.exists(self.history_file): | |
| print(f"Creating new history file: {self.history_file}") | |
| with open(self.history_file, 'w', encoding='utf-8') as f: | |
| json.dump([], f) | |
| else: | |
| print(f"History file exists: {self.history_file}") | |
| # Added a check for empty file before loading | |
| if os.path.getsize(self.history_file) > 0: | |
| with open(self.history_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| print(f"Current history entries: {len(data)}") | |
| else: | |
| print("History file is empty.") | |
| except Exception as e: | |
| print(f"Error in _init_file: {str(e)}") | |
| print(traceback.format_exc()) | |
| def save_history(self, user_preferences: dict = None, results: list = None, search_type: str = "criteria", description: str = None, user_description: str = None) -> bool: | |
| """ | |
| Save search history with complete result data | |
| """ | |
| try: | |
| taipei_tz = pytz.timezone('Asia/Taipei') | |
| current_time = datetime.now(taipei_tz) | |
| history_entry = { | |
| "timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "search_type": search_type | |
| } | |
| description_text = user_description or description | |
| if search_type == "description" and description_text: | |
| history_entry["user_description"] = description_text[:200] + "..." if len(description_text) > 200 else description_text | |
| def _to_float(x, default=0.0): | |
| try: | |
| return float(x) | |
| except Exception: | |
| return default | |
| def _to_int(x, default=0): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return default | |
| if results and isinstance(results, list): | |
| processed_results = [] | |
| for i, r in enumerate(results[:15], start=1): | |
| processed_results.append({ | |
| "breed": str(r.get("breed", "Unknown")), | |
| "rank": _to_int(r.get("rank", i)), | |
| # 先拿 overall_score,沒有就退 final_score,都轉成 float | |
| "overall_score": _to_float(r.get("overall_score", r.get("final_score", 0))), | |
| # 描述搜尋常見附加分,也一併安全轉型 | |
| "semantic_score": _to_float(r.get("semantic_score", 0)), | |
| "comparative_bonus": _to_float(r.get("comparative_bonus", 0)), | |
| "lifestyle_bonus": _to_float(r.get("lifestyle_bonus", 0)), | |
| "size": str(r.get("size", "Unknown")), | |
| }) | |
| history_entry["results"] = processed_results | |
| if user_preferences: | |
| history_entry["preferences"] = { | |
| 'living_space': user_preferences.get('living_space'), | |
| 'exercise_time': user_preferences.get('exercise_time'), | |
| 'grooming_commitment': user_preferences.get('grooming_commitment'), | |
| 'experience_level': user_preferences.get('experience_level'), | |
| 'has_children': user_preferences.get('has_children'), | |
| 'noise_tolerance': user_preferences.get('noise_tolerance'), | |
| 'size_preference': user_preferences.get('size_preference') | |
| } | |
| try: | |
| history = [] | |
| if os.path.exists(self.history_file) and os.path.getsize(self.history_file) > 0: | |
| with open(self.history_file, 'r', encoding='utf-8') as f: | |
| history = json.load(f) | |
| except json.JSONDecodeError as e: | |
| print(f"JSON decode error when reading history: {str(e)}") | |
| backup_file = f"{self.history_file}.backup.{int(datetime.now().timestamp())}" | |
| if os.path.exists(self.history_file): | |
| os.rename(self.history_file, backup_file) | |
| print(f"Backed up corrupted file to {backup_file}") | |
| history = [] | |
| history.append(history_entry) | |
| history = history[-20:] # Keep recent 20 entries | |
| temp_file = f"{self.history_file}.tmp" | |
| try: | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| json.dump(history, f, ensure_ascii=False, indent=2) | |
| os.rename(temp_file, self.history_file) | |
| except Exception as e: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| raise | |
| print(f"Successfully saved history entry for {search_type} search.") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving history: {str(e)}") | |
| print(traceback.format_exc()) | |
| return False | |
| # get_history, clear_all_history, and format_history_for_display methods remain the same as you provided | |
| def get_history(self) -> list: | |
| """Get search history""" | |
| try: | |
| print("Attempting to read history") # Debug | |
| # Check if file exists and is not empty | |
| if not os.path.exists(self.history_file): | |
| print("History file does not exist, creating empty file") | |
| with open(self.history_file, 'w', encoding='utf-8') as f: | |
| json.dump([], f) | |
| return [] | |
| # Check file size | |
| if os.path.getsize(self.history_file) == 0: | |
| print("History file is empty, initializing with empty array") | |
| with open(self.history_file, 'w', encoding='utf-8') as f: | |
| json.dump([], f) | |
| return [] | |
| # Try to read with error recovery | |
| try: | |
| with open(self.history_file, 'r', encoding='utf-8') as f: | |
| content = f.read().strip() | |
| if not content: | |
| print("File content is empty, returning empty list") | |
| return [] | |
| data = json.loads(content) | |
| print(f"Read {len(data)} history entries") # Debug | |
| return data if isinstance(data, list) else [] | |
| except json.JSONDecodeError as je: | |
| print(f"JSON decode error: {str(je)}") | |
| print(f"Corrupted content near position {je.pos}") | |
| # Backup corrupted file and create new one | |
| backup_file = f"{self.history_file}.backup" | |
| os.rename(self.history_file, backup_file) | |
| print(f"Backed up corrupted file to {backup_file}") | |
| with open(self.history_file, 'w', encoding='utf-8') as f: | |
| json.dump([], f) | |
| return [] | |
| except Exception as e: | |
| print(f"Error reading history: {str(e)}") | |
| print(traceback.format_exc()) | |
| return [] | |
| def clear_all_history(self) -> bool: | |
| """Clear all history records""" | |
| try: | |
| print("Attempting to clear all history") # Debug | |
| with open(self.history_file, 'w', encoding='utf-8') as f: | |
| json.dump([], f) | |
| print("History cleared successfully") # Debug | |
| return True | |
| except Exception as e: | |
| print(f"Error clearing history: {str(e)}") | |
| print(traceback.format_exc()) | |
| return False | |
| def format_history_for_display(self) -> str: | |
| """ | |
| Format history records for HTML display | |
| Returns: | |
| str: Formatted HTML string | |
| """ | |
| try: | |
| history = self.get_history() | |
| if not history: | |
| return """ | |
| <div style="text-align: center; padding: 20px; color: #718096;"> | |
| <p>No search history yet</p> | |
| </div> | |
| """ | |
| html_parts = [] | |
| html_parts.append(""" | |
| <div style="max-height: 400px; overflow-y: auto;"> | |
| """) | |
| for i, entry in enumerate(reversed(history)): # Latest entries first | |
| search_type = entry.get('search_type', 'criteria') | |
| timestamp = entry.get('timestamp', 'Unknown time') | |
| results = entry.get('results', []) | |
| # Set tag color based on search type | |
| if search_type == 'description': | |
| tag_color = "#4299e1" # Blue | |
| tag_bg = "rgba(66, 153, 225, 0.1)" | |
| tag_text = "Description Search" | |
| icon = "🤖" | |
| else: | |
| tag_color = "#48bb78" # Green | |
| tag_bg = "rgba(72, 187, 120, 0.1)" | |
| tag_text = "Criteria Search" | |
| icon = "🔍" | |
| # Search content preview | |
| preview_content = "" | |
| if search_type == 'description': | |
| user_desc = entry.get('user_description', '') | |
| if user_desc: | |
| preview_content = f"Description: {user_desc}" | |
| else: | |
| prefs = entry.get('preferences', {}) | |
| if prefs: | |
| living = prefs.get('living_space', '') | |
| size = prefs.get('size_preference', '') | |
| exercise = prefs.get('exercise_time', '') | |
| preview_content = f"Living: {living}, Size: {size}, Exercise: {exercise}min" | |
| # Result summary | |
| result_summary = "" | |
| if results: | |
| top_breeds = [r.get('breed', 'Unknown') for r in results[:3]] | |
| result_summary = f"Recommended: {', '.join(top_breeds)}" | |
| if len(results) > 3: | |
| result_summary += f" and {len(results)} breeds total" | |
| html_parts.append(f""" | |
| <div style=" | |
| border: 1px solid #e2e8f0; | |
| border-radius: 8px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| background: white; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1); | |
| "> | |
| <div style="display: flex; justify-content: between; align-items: center; margin-bottom: 8px;"> | |
| <div style=" | |
| background: {tag_bg}; | |
| color: {tag_color}; | |
| padding: 4px 8px; | |
| border-radius: 12px; | |
| font-size: 0.8em; | |
| font-weight: 600; | |
| display: inline-flex; | |
| align-items: center; | |
| gap: 4px; | |
| "> | |
| {icon} {tag_text} | |
| </div> | |
| <div style="font-size: 0.8em; color: #718096;"> | |
| {timestamp} | |
| </div> | |
| </div> | |
| {f'<div style="font-size: 0.9em; color: #4a5568; margin: 4px 0;">{preview_content}</div>' if preview_content else ''} | |
| {f'<div style="font-size: 0.9em; color: #2d3748; font-weight: 500;">{result_summary}</div>' if result_summary else ''} | |
| </div> | |
| """) | |
| html_parts.append("</div>") | |
| return ''.join(html_parts) | |
| except Exception as e: | |
| print(f"Error formatting history for display: {str(e)}") | |
| return f""" | |
| <div style="text-align: center; padding: 20px; color: #e53e3e;"> | |
| <p>Error loading history records: {str(e)}</p> | |
| </div> | |
| """ | |
| def get_search_statistics(self) -> dict: | |
| """ | |
| Get search statistics information | |
| Returns: | |
| dict: Statistics information | |
| """ | |
| try: | |
| history = self.get_history() | |
| stats = { | |
| 'total_searches': len(history), | |
| 'criteria_searches': 0, | |
| 'description_searches': 0, | |
| 'most_searched_breeds': {}, | |
| 'search_frequency_by_day': {} | |
| } | |
| for entry in history: | |
| search_type = entry.get('search_type', 'criteria') | |
| if search_type == 'description': | |
| stats['description_searches'] += 1 | |
| else: | |
| stats['criteria_searches'] += 1 | |
| # Count breed search frequency | |
| results = entry.get('results', []) | |
| for result in results: | |
| breed = result.get('breed', 'Unknown') | |
| stats['most_searched_breeds'][breed] = stats['most_searched_breeds'].get(breed, 0) + 1 | |
| # Count search frequency by date | |
| timestamp = entry.get('timestamp', '') | |
| if timestamp: | |
| date = timestamp.split(' ')[0] | |
| stats['search_frequency_by_day'][date] = stats['search_frequency_by_day'].get(date, 0) + 1 | |
| return stats | |
| except Exception as e: | |
| print(f"Error getting search statistics: {str(e)}") | |
| return { | |
| 'total_searches': 0, | |
| 'criteria_searches': 0, | |
| 'description_searches': 0, | |
| 'most_searched_breeds': {}, | |
| 'search_frequency_by_day': {} | |
| } | |