import gradio as gr import threading import os import shutil import tempfile import time import json from util import process_text_to_image, download_and_check_result_nsfw, GoodWebsiteUrl from nfsw import NSFWDetector # Simplified i18n - English only translations = { "en": { "header_title": "🎨 AI Text to Image Generator", "prompt_input_label": "Enter your prompt", "prompt_input_placeholder": "Describe the image you want to generate...", "aspect_ratio_label": "Aspect Ratio", "generate_button": "🚀 Generate Image", "output_image_label": "Generated Image", "status_output_label": "Status", "prompt_examples_header": "### 💡 Prompt Examples", "error_enter_prompt": "Please enter a prompt", "error_prompt_too_short": "❌ Prompt must be more than 3 characters", "error_request_processing": "❌ Request processing error", "error_free_limit_reached": "❌ You have reached the free generation limit. Please visit https://omnicreator.net/text-to-image for unlimited generation", "error_free_limit_wait": "❌ Please wait {wait_minutes_int} minutes before generating again, or visit https://omnicreator.net/text-to-image for unlimited generation", "status_completed_message": "✅ {message}", "status_processing_completed": "Processing completed", "error_processing_failed": "❌ {message}", "error_processing_exception": "❌ Error occurred: {error}", "warning_content_filter": "⚠️ Content filter applied. For unlimited creative freedom, visit https://omnicreator.net/text-to-image", "status_checking_result": "Checking result...", "status_applying_filter": "Applying filter...", } } def t(key, lang="en"): return translations.get(lang, {}).get(key, key) # Configuration parameters TIP_TRY_N = 4 # Show like button tip after 12 tries FREE_TRY_N = 8 # Free phase: first 15 tries without restrictions SLOW_TRY_N = 12 # Slow phase start: 25 tries SLOW2_TRY_N = 18 # Slow phase start: 32 tries RATE_LIMIT_60 = 25 # Full restriction: blocked after 40 tries # Time window configuration (minutes) PHASE_1_WINDOW = 3 # 15-25 tries: 5 minutes PHASE_2_WINDOW = 10 # 25-32 tries: 10 minutes PHASE_3_WINDOW = 20 # 32-40 tries: 20 minutes MAX_IMAGES_PER_WINDOW = 2 # Max images per time window high_priority_n = 2 # 每个ip开头几个任务是高优先级的 IP_Dict = {} # IP generation statistics and time window tracking IP_Generation_Count = {} # Record total generation count for each IP IP_Rate_Limit_Track = {} # Record generation count and timestamp in current time window for each IP IP_Country_Cache = {} # Cache IP country information to avoid repeated queries # Country usage statistics Country_Usage_Stats = {} # Track usage count by country Total_Request_Count = 0 # Total request counter for periodic printing PRINT_STATS_INTERVAL = 10 # Print stats every N requests # Async IP query tracking IP_Query_Results = {} # Track async query results # Restricted countries list (these countries have lower usage limits) RESTRICTED_COUNTRIES = ["印度", "巴基斯坦"] RESTRICTED_COUNTRY_LIMIT = 2 # Max usage for restricted countries country_dict = { "zh": ["中国", "香港"], "fi": ["芬兰"], "en": ["美国", "澳大利亚", "英国", "加拿大", "新西兰", "爱尔兰"], "es": ["西班牙", "墨西哥", "阿根廷", "哥伦比亚", "智利", "秘鲁"], "pt": ["葡萄牙", "巴西"], "fr": ["法国", "摩纳哥"], "de": ["德国", "奥地利", ], "it": ["意大利", "圣马力诺", "梵蒂冈"], "ja": ["日本"], "ru": ["俄罗斯"], "uk": ["乌克兰"], "ar": ["沙特阿拉伯", "埃及", "阿拉伯联合酋长国", "摩洛哥"], "nl":["荷兰"], "no":["挪威"], "sv":["瑞典"], "id":["印度尼西亚"], "vi": ["越南"], "he": ["以色列"], "tr": ["土耳其"], "da": ["丹麦"], } def query_ip_country(client_ip): """ Query IP address geo information with robust error handling Features: - 3 second timeout limit - Comprehensive error handling - Automatic fallback to default values - Cache mechanism to avoid repeated queries Returns: dict: {"country": str, "region": str, "city": str} """ # Check cache first - no API call for subsequent visits if client_ip in IP_Country_Cache: print(f"Using cached IP data for {client_ip}") return IP_Country_Cache[client_ip] # Validate IP address if not client_ip or client_ip in ["127.0.0.1", "localhost", "::1"]: print(f"Invalid or local IP address: {client_ip}, using default") default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} IP_Country_Cache[client_ip] = default_geo return default_geo # First time visit - query API with robust error handling print(f"Querying IP geolocation for {client_ip}...") try: import requests from requests.exceptions import Timeout, ConnectionError, RequestException api_url = f"https://api.vore.top/api/IPdata?ip={client_ip}" # Make request with 3 second timeout response = requests.get(api_url, timeout=3) if response.status_code == 200: data = response.json() if data.get("code") == 200 and "ipdata" in data: ipdata = data["ipdata"] geo_info = { "country": ipdata.get("info1", "Unknown"), "region": ipdata.get("info2", "Unknown"), "city": ipdata.get("info3", "Unknown") } IP_Country_Cache[client_ip] = geo_info print(f"Successfully detected location for {client_ip}: {geo_info['country']}") return geo_info else: print(f"API returned invalid data for {client_ip}: {data}") else: print(f"API request failed with status {response.status_code} for {client_ip}") except Timeout: print(f"Timeout (>3s) querying IP location for {client_ip}, using default") except ConnectionError: print(f"Network connection error for IP {client_ip}, using default") except RequestException as e: print(f"Request error for IP {client_ip}: {e}, using default") except Exception as e: print(f"Unexpected error querying IP {client_ip}: {e}, using default") # All failures lead here - cache default and return default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} IP_Country_Cache[client_ip] = default_geo print(f"Cached default location for {client_ip}") return default_geo def query_ip_country_async(client_ip): """ Async version that returns immediately with default, then updates cache in background Returns: tuple: (immediate_lang, geo_info_or_none) """ # If already cached, return immediately if client_ip in IP_Country_Cache: geo_info = IP_Country_Cache[client_ip] lang = get_lang_from_country(geo_info["country"]) return lang, geo_info # Return default immediately, query in background return "en", None def get_lang_from_country(country): """ Map country name to language code with comprehensive validation Features: - Handles invalid/empty input - Case-insensitive matching - Detailed logging - Always returns valid language code Args: country (str): Country name Returns: str: Language code (always valid, defaults to "en") """ # Input validation if not country or not isinstance(country, str) or country.strip() == "": print(f"Invalid country provided: '{country}', defaulting to English") return "en" # Normalize country name country = country.strip() if country.lower() == "unknown": print(f"Unknown country, defaulting to English") return "en" try: # Search in country dictionary with case-sensitive match first for lang, countries in country_dict.items(): if country in countries: print(f"Matched country '{country}' to language '{lang}'") return lang # If no exact match, try case-insensitive match country_lower = country.lower() for lang, countries in country_dict.items(): for country_variant in countries: if country_variant.lower() == country_lower: print(f"Case-insensitive match: country '{country}' to language '{lang}'") return lang # No match found print(f"Country '{country}' not found in country_dict, defaulting to English") return "en" except Exception as e: print(f"Error matching country '{country}': {e}, defaulting to English") return "en" def get_lang_from_ip(client_ip): """ Get language based on IP geolocation with comprehensive error handling Features: - Validates input IP address - Handles all possible exceptions - Always returns a valid language code - Defaults to English on any failure - Includes detailed logging Args: client_ip (str): Client IP address Returns: str: Language code (always valid, defaults to "en") """ # Input validation if not client_ip or not isinstance(client_ip, str): print(f"Invalid IP address provided: {client_ip}, defaulting to English") return "en" try: # Query geolocation info (has its own error handling and 3s timeout) geo_info = query_ip_country(client_ip) if not geo_info or not isinstance(geo_info, dict): print(f"No geolocation data for {client_ip}, defaulting to English") return "en" # Extract country with fallback country = geo_info.get("country", "Unknown") if not country or country == "Unknown": print(f"Unknown country for IP {client_ip}, defaulting to English") return "en" # Map country to language detected_lang = get_lang_from_country(country) # Validate language code if not detected_lang or not isinstance(detected_lang, str) or len(detected_lang) != 2: print(f"Invalid language code '{detected_lang}' for {client_ip}, defaulting to English") return "en" print(f"IP {client_ip} -> Country: {country} -> Language: {detected_lang}") return detected_lang except Exception as e: print(f"Unexpected error getting language from IP {client_ip}: {e}, defaulting to English") return "en" # Always return a valid language code def is_restricted_country_ip(client_ip): """ Check if IP is from a restricted country Returns: bool: True if from restricted country """ geo_info = query_ip_country(client_ip) country = geo_info["country"] return country in RESTRICTED_COUNTRIES def get_ip_max_limit(client_ip): """ Get max usage limit for IP based on country Returns: int: Max usage limit """ if is_restricted_country_ip(client_ip): return RESTRICTED_COUNTRY_LIMIT else: return RATE_LIMIT_60 def get_ip_generation_count(client_ip): """ Get IP generation count """ if client_ip not in IP_Generation_Count: IP_Generation_Count[client_ip] = 0 return IP_Generation_Count[client_ip] def increment_ip_generation_count(client_ip): """ Increment IP generation count """ if client_ip not in IP_Generation_Count: IP_Generation_Count[client_ip] = 0 IP_Generation_Count[client_ip] += 1 return IP_Generation_Count[client_ip] def get_ip_phase(client_ip): """ Get current phase for IP Returns: str: 'free', 'rate_limit_1', 'rate_limit_2', 'rate_limit_3', 'blocked' """ count = get_ip_generation_count(client_ip) max_limit = get_ip_max_limit(client_ip) # For restricted countries, check if they've reached their limit if is_restricted_country_ip(client_ip): if count >= max_limit: return 'blocked' elif count >= max_limit - 2: # Last 2 attempts return 'rate_limit_3' elif count >= max_limit - 3: # 3rd attempt from end return 'rate_limit_2' elif count >= max_limit - 4: # 4th attempt from end return 'rate_limit_1' else: return 'free' # For normal countries, use standard limits if count < FREE_TRY_N: return 'free' elif count < SLOW_TRY_N: return 'rate_limit_1' # NSFW blur + 5 minutes 2 images elif count < SLOW2_TRY_N: return 'rate_limit_2' # NSFW blur + 10 minutes 2 images elif count < max_limit: return 'rate_limit_3' # NSFW blur + 20 minutes 2 images else: return 'blocked' # Generation blocked def check_rate_limit_for_phase(client_ip, phase): """ Check rate limit for specific phase Returns: tuple: (is_limited, wait_time_minutes, current_count) """ if phase not in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: return False, 0, 0 # Determine time window if phase == 'rate_limit_1': window_minutes = PHASE_1_WINDOW elif phase == 'rate_limit_2': window_minutes = PHASE_2_WINDOW else: # rate_limit_3 window_minutes = PHASE_3_WINDOW current_time = time.time() window_key = f"{client_ip}_{phase}" # Clean expired records if window_key in IP_Rate_Limit_Track: track_data = IP_Rate_Limit_Track[window_key] # Check if within current time window if current_time - track_data['start_time'] > window_minutes * 60: # Time window expired, reset IP_Rate_Limit_Track[window_key] = { 'count': 0, 'start_time': current_time, 'last_generation': current_time } else: # Initialize IP_Rate_Limit_Track[window_key] = { 'count': 0, 'start_time': current_time, 'last_generation': current_time } track_data = IP_Rate_Limit_Track[window_key] # Check if exceeded limit if track_data['count'] >= MAX_IMAGES_PER_WINDOW: # Calculate remaining wait time elapsed = current_time - track_data['start_time'] wait_time = (window_minutes * 60) - elapsed wait_minutes = max(0, wait_time / 60) return True, wait_minutes, track_data['count'] return False, 0, track_data['count'] def update_country_stats(client_ip): """ Update country usage statistics and print periodically """ global Total_Request_Count, Country_Usage_Stats # Get country info geo_info = IP_Country_Cache.get(client_ip, {"country": "Unknown", "region": "Unknown", "city": "Unknown"}) country = geo_info["country"] # Update country stats if country not in Country_Usage_Stats: Country_Usage_Stats[country] = 0 Country_Usage_Stats[country] += 1 # Increment total request counter Total_Request_Count += 1 # Print stats every N requests if Total_Request_Count % PRINT_STATS_INTERVAL == 0: print("\n" + "="*60) print(f"📊 国家使用统计 (总请求数: {Total_Request_Count})") print("="*60) # Sort by usage count (descending) sorted_stats = sorted(Country_Usage_Stats.items(), key=lambda x: x[1], reverse=True) for country_name, count in sorted_stats: percentage = (count / Total_Request_Count) * 100 print(f" {country_name}: {count} 次 ({percentage:.1f}%)") print("="*60 + "\n") def record_generation_attempt(client_ip, phase): """ Record generation attempt """ # Increment total count increment_ip_generation_count(client_ip) # Update country statistics update_country_stats(client_ip) # Record time window count if phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: window_key = f"{client_ip}_{phase}" current_time = time.time() if window_key in IP_Rate_Limit_Track: IP_Rate_Limit_Track[window_key]['count'] += 1 IP_Rate_Limit_Track[window_key]['last_generation'] = current_time else: IP_Rate_Limit_Track[window_key] = { 'count': 1, 'start_time': current_time, 'last_generation': current_time } def apply_gaussian_blur_to_image_url(image_url, blur_strength=50): """ Apply Gaussian blur to image URL Args: image_url (str): Original image URL blur_strength (int): Blur strength, default 50 (heavy blur) Returns: PIL.Image: Blurred PIL Image object """ try: import requests from PIL import Image, ImageFilter import io # Download image response = requests.get(image_url, timeout=30) if response.status_code != 200: return None # Convert to PIL Image image_data = io.BytesIO(response.content) image = Image.open(image_data) # Apply heavy Gaussian blur blurred_image = image.filter(ImageFilter.GaussianBlur(radius=blur_strength)) return blurred_image except Exception as e: print(f"⚠️ Failed to apply Gaussian blur: {e}") return None # Initialize NSFW detector (download from Hugging Face) try: nsfw_detector = NSFWDetector() # Auto download falconsai_yolov9_nsfw_model_quantized.pt from Hugging Face print("✅ NSFW detector initialized successfully") except Exception as e: print(f"❌ NSFW detector initialization failed: {e}") nsfw_detector = None def generate_image_interface(prompt, aspect_ratio, lang, request: gr.Request, progress=gr.Progress()): """ Interface function for text-to-image generation with phase-based limitations """ try: # Extract user IP client_ip = request.client.host x_forwarded_for = dict(request.headers).get('x-forwarded-for') if x_forwarded_for: client_ip = x_forwarded_for if client_ip not in IP_Dict: IP_Dict[client_ip] = 0 IP_Dict[client_ip] += 1 if not prompt or prompt.strip() == "": return None, t("error_enter_prompt", lang), gr.update(visible=False) # Check if prompt length is greater than 3 characters if len(prompt.strip()) <= 3: return None, t("error_prompt_too_short", lang), gr.update(visible=False) # Parse aspect ratio to get width and height aspect_ratios = { "16:9": (1364, 768), "4:3": (1182, 887), "1:1": (1024, 1024), "3:4": (887, 1182), "9:16": (768, 1364) } width, height = aspect_ratios.get(aspect_ratio, (1024, 1024)) except Exception as e: print(f"⚠️ Request preprocessing error: {e}") return None, t("error_request_processing", lang), gr.update(visible=False) # Get user current phase current_phase = get_ip_phase(client_ip) current_count = get_ip_generation_count(client_ip) geo_info = IP_Country_Cache.get(client_ip, {"country": "Unknown", "region": "Unknown", "city": "Unknown"}) is_restricted = is_restricted_country_ip(client_ip) print(f"📊 User phase info - IP: {client_ip}, Location: {geo_info['country']}/{geo_info['region']}/{geo_info['city']}, Phase: {current_phase}, Count: {current_count}, Restricted: {is_restricted}") # Check if user reached the like button tip threshold # For restricted countries, show like tip from the first attempt show_like_tip = (current_count >= 1) if is_restricted else (current_count >= TIP_TRY_N) # Check if completely blocked if current_phase == 'blocked': # Generate blocked limit button with different URL for restricted countries if is_restricted: blocked_url = GoodWebsiteUrl else: blocked_url = 'https://omnicreator.net/text-to-image' blocked_button_html = f"""
""" # Use same message for all users to avoid discrimination perception blocked_message = t("error_free_limit_reached", lang) return None, blocked_message, gr.update(value=blocked_button_html, visible=True) # Check rate limit (applies to rate_limit phases) if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: is_limited, wait_minutes, window_count = check_rate_limit_for_phase(client_ip, current_phase) if is_limited: wait_minutes_int = int(wait_minutes) + 1 # Generate rate limit button with different URL for restricted countries if is_restricted: rate_limit_url = GoodWebsiteUrl else: rate_limit_url = 'https://omnicreator.net/text-to-image' rate_limit_button_html = f""" """ return None, t("error_free_limit_wait", lang).format(wait_minutes_int=wait_minutes_int), gr.update(value=rate_limit_button_html, visible=True) # Handle NSFW detection based on phase is_nsfw_task = False # Track if this task involves NSFW content result_url = None status_message = "" def progress_callback(message): try: nonlocal status_message status_message = message # Add error handling to prevent progress update failure if progress is not None: # Enhanced progress display with better formatting if "Queue:" in message or "tasks ahead" in message: # Queue status - show with different progress value to indicate waiting progress(0.1, desc=message) elif "Processing" in message or "AI is processing" in message: # Processing status progress(0.7, desc=message) elif "Generating" in message or "Almost done" in message: # Generation status progress(0.9, desc=message) else: # Default status progress(0.5, desc=message) except Exception as e: print(f"⚠️ Progress update failed: {e}") try: # Determine priority before recording generation attempt # First high_priority_n tasks for each IP get priority=1 task_priority = 1 if current_count < high_priority_n else 0 # Record generation attempt (before actual generation to ensure correct count) record_generation_attempt(client_ip, current_phase) updated_count = get_ip_generation_count(client_ip) print(f"✅ Processing started - IP: {client_ip}, phase: {current_phase}, total count: {updated_count}, priority: {task_priority}, prompt: {prompt.strip()}, width: {width}, height: {height}", flush=True) # Call text-to-image processing function with priority result_url, message, task_uuid = process_text_to_image(prompt.strip(), width, height, progress_callback, priority=task_priority) if result_url: print(f"✅ Generation completed successfully - IP: {client_ip}, result_url: {result_url}, task_uuid: {task_uuid}", flush=True) # Detect result image NSFW content (only in rate limit phases) if nsfw_detector is not None and current_phase != 'free': try: if progress is not None: progress(0.9, desc=t("status_checking_result", lang)) is_nsfw, nsfw_error = download_and_check_result_nsfw(result_url, nsfw_detector) if nsfw_error: print(f"⚠️ Result image NSFW detection error - IP: {client_ip}, error: {nsfw_error}") elif is_nsfw: is_nsfw_task = True # Mark task as NSFW print(f"🔍 Result image NSFW detected in {current_phase} phase: ❌❌❌ - IP: {client_ip} (will blur result)") else: print(f"🔍 Result image NSFW check passed: ✅✅✅ - IP: {client_ip}") except Exception as e: print(f"⚠️ Result image NSFW detection exception - IP: {client_ip}, error: {str(e)}") # Apply blur if this is an NSFW task in rate limit phases should_blur = False if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3'] and is_nsfw_task: should_blur = True # Apply blur processing if should_blur: if progress is not None: progress(0.95, desc=t("status_applying_filter", lang)) blurred_image = apply_gaussian_blur_to_image_url(result_url) if blurred_image is not None: final_result = blurred_image # Return PIL Image object final_message = t("warning_content_filter", lang) print(f"🔒 Applied Gaussian blur for NSFW content - IP: {client_ip}") else: # Blur failed, return original URL with warning final_result = result_url final_message = t("warning_content_review", lang) # Generate NSFW button for blurred content with different URL for restricted countries if is_restricted: nsfw_url = GoodWebsiteUrl else: nsfw_url = 'https://omnicreator.net/text-to-image' nsfw_action_buttons_html = f""" """ return final_result, final_message, gr.update(value=nsfw_action_buttons_html, visible=True) else: final_result = result_url final_message = t("status_completed_message", lang).format(message=message) try: if progress is not None: progress(1.0, desc=t("status_processing_completed", lang)) except Exception as e: print(f"⚠️ Final progress update failed: {e}") # Generate action buttons HTML action_buttons_html = "" # For restricted countries, only show like tip (no action buttons) if is_restricted: if show_like_tip: action_buttons_html = """Experience the ultimate freedom in AI image creation! Generate unlimited images from text without restrictions, with complete creative freedom and no limits on your imagination.
Join thousands of creators who trust Omni Creator for unrestricted AI image generation!
Transform your ideas into stunning visuals with our advanced AI text-to-image platform. Whether you're creating art, designing content, or exploring creative possibilities - our powerful AI brings your words to life.
Premium users enjoy unlimited image generation without daily limits, rate restrictions, or content barriers. Create as many images as you need, whenever you need them.
Generate any type of content with complete creative freedom. Unlimited possibilities for artists, designers, and content creators.
Advanced AI infrastructure delivers high-quality results in seconds. No waiting in queues, no processing delays - just instant image generation.
Generate images in various aspect ratios - from square 1:1 to landscape 16:9 and portrait 9:16. Perfect for any platform or use case.
State-of-the-art AI models deliver exceptional quality and realism. Professional results suitable for commercial use and high-end projects.
Easy-to-use interface - just enter your prompt and choose an aspect ratio. No complex settings or technical knowledge required.
Use detailed, specific prompts for better results. Describe colors, styles, lighting, and composition clearly.
Include style keywords like "photorealistic", "oil painting", "anime", "3D render" to guide the aesthetic.
Choose the aspect ratio that fits your use case - 1:1 for social media, 16:9 for presentations, 9:16 for mobile.
Add specific details about mood, time of day, camera angle, and other elements for more control.