Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from PIL import Image | |
import numpy as np | |
import os | |
from pathlib import Path | |
from datetime import datetime | |
import tempfile | |
import time | |
import psutil | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import pandas as pd | |
from functools import partial | |
import logging | |
from model import RadarDetectionModel | |
from feature_extraction import (calculate_amplitude, classify_amplitude, | |
calculate_distribution_range, classify_distribution_range, | |
calculate_attenuation_rate, classify_attenuation_rate, | |
count_reflections, classify_reflections, | |
extract_features) | |
from report_generation import generate_report, render_report | |
from utils import plot_detection | |
from database import save_report, get_report_history | |
from config import MODEL_NAME | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Set theme and styling | |
THEME = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="indigo", | |
neutral_hue="slate", | |
radius_size=gr.themes.sizes.radius_sm, | |
text_size=gr.themes.sizes.text_md, | |
) | |
# Create a simple dark mode flag instead of custom theme | |
DARK_MODE = False | |
# Global variables | |
model = None | |
USE_DEMO_MODE = False | |
HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HF_TOCKEN") | |
# 添加一个标志,表示是否已经尝试过初始化模型 | |
MODEL_INIT_ATTEMPTED = False | |
class TechnicalReportGenerator: | |
def __init__(self): | |
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def generate_model_analysis(self, model_outputs): | |
"""Generate model-specific analysis section""" | |
model_section = "## Model Analysis\n\n" | |
# Image encoder analysis | |
model_section += "### Image Encoder (SigLIP-So400m) Analysis\n" | |
model_section += "- Feature extraction quality: {:.2f}%\n".format(model_outputs.get('feature_quality', 0) * 100) | |
model_section += "- Image encoding latency: {:.2f}ms\n".format(model_outputs.get('encoding_latency', 0)) | |
model_section += "- Feature map dimensions: {}\n\n".format(model_outputs.get('feature_dimensions', 'N/A')) | |
# Text decoder analysis | |
model_section += "### Text Decoder (Gemma-2B) Analysis\n" | |
model_section += "- Text generation confidence: {:.2f}%\n".format(model_outputs.get('text_confidence', 0) * 100) | |
model_section += "- Decoding latency: {:.2f}ms\n".format(model_outputs.get('decoding_latency', 0)) | |
model_section += "- Token processing rate: {:.2f} tokens/sec\n\n".format(model_outputs.get('token_rate', 0)) | |
return model_section | |
def generate_detection_analysis(self, detection_results): | |
"""Generate detailed detection analysis section""" | |
detection_section = "## Detection Analysis\n\n" | |
# Detection metrics | |
detection_section += "### Object Detection Metrics\n" | |
detection_section += "| Metric | Value |\n" | |
detection_section += "|--------|-------|\n" | |
detection_section += "| Detection Count | {} |\n".format(len(detection_results.get('boxes', []))) | |
detection_section += "| Average Confidence | {:.2f}% |\n".format( | |
np.mean(detection_results.get('scores', [0])) * 100 | |
) | |
detection_section += "| Processing Time | {:.2f}ms |\n\n".format( | |
detection_results.get('processing_time', 0) | |
) | |
# Detailed detection results | |
detection_section += "### Detection Details\n" | |
detection_section += "| Object | Confidence | Bounding Box |\n" | |
detection_section += "|--------|------------|---------------|\n" | |
boxes = detection_results.get('boxes', []) | |
scores = detection_results.get('scores', []) | |
labels = detection_results.get('labels', []) | |
for box, score, label in zip(boxes, scores, labels): | |
detection_section += "| {} | {:.2f}% | {} |\n".format( | |
label, | |
score * 100, | |
[round(coord, 2) for coord in box] | |
) | |
return detection_section | |
def generate_multimodal_analysis(self, mm_results): | |
"""Generate multimodal analysis section""" | |
mm_section = "## Multimodal Analysis\n\n" | |
# Feature correlation analysis | |
mm_section += "### Feature Correlation Analysis\n" | |
mm_section += "- Text-Image Alignment Score: {:.2f}%\n".format( | |
mm_results.get('alignment_score', 0) * 100 | |
) | |
mm_section += "- Cross-Modal Coherence: {:.2f}%\n".format( | |
mm_results.get('coherence_score', 0) * 100 | |
) | |
mm_section += "- Feature Space Correlation: {:.2f}\n\n".format( | |
mm_results.get('feature_correlation', 0) | |
) | |
return mm_section | |
def generate_performance_metrics(self, perf_data): | |
"""Generate performance metrics section""" | |
perf_section = "## Performance Metrics\n\n" | |
# System metrics | |
perf_section += "### System Performance\n" | |
perf_section += "- Total Processing Time: {:.2f}ms\n".format(perf_data.get('total_time', 0)) | |
perf_section += "- Peak Memory Usage: {:.2f}MB\n".format(perf_data.get('peak_memory', 0)) | |
perf_section += "- GPU Utilization: {:.2f}%\n\n".format(perf_data.get('gpu_util', 0)) | |
# Pipeline metrics | |
perf_section += "### Pipeline Statistics\n" | |
perf_section += "| Stage | Time (ms) | Memory (MB) |\n" | |
perf_section += "|-------|------------|-------------|\n" | |
pipeline_stages = perf_data.get('pipeline_stats', {}) | |
for stage, stats in pipeline_stages.items(): | |
perf_section += "| {} | {:.2f} | {:.2f} |\n".format( | |
stage, | |
stats.get('time', 0), | |
stats.get('memory', 0) | |
) | |
return perf_section | |
def generate_report(self, results): | |
"""Generate comprehensive technical report""" | |
report = f"# Technical Analysis Report\nGenerated at: {self.timestamp}\n\n" | |
# Add model analysis | |
report += self.generate_model_analysis(results.get('model_outputs', {})) | |
# Add detection analysis | |
report += self.generate_detection_analysis(results.get('detection_results', {})) | |
# Add multimodal analysis | |
report += self.generate_multimodal_analysis(results.get('multimodal_results', {})) | |
# Add performance metrics | |
report += self.generate_performance_metrics(results.get('performance_data', {})) | |
return report | |
def check_available_memory(): | |
"""Check available system memory in MB""" | |
try: | |
import psutil | |
vm = psutil.virtual_memory() | |
available_mb = vm.available / (1024 * 1024) | |
total_mb = vm.total / (1024 * 1024) | |
print(f"Available memory: {available_mb:.2f}MB out of {total_mb:.2f}MB total") | |
return available_mb | |
except Exception as e: | |
print(f"Error checking memory: {str(e)}") | |
return 0 | |
def monitor_memory_during_loading(model_name, use_auth_token=None): | |
"""Monitor memory usage during model loading and abort if it gets too high""" | |
global USE_DEMO_MODE | |
try: | |
# Initial memory check | |
initial_memory = get_memory_usage() | |
print(f"Initial memory usage: {initial_memory:.2f}MB") | |
# Start loading processor | |
print(f"Loading processor from {model_name}") | |
if use_auth_token: | |
processor = AutoProcessor.from_pretrained(model_name, use_auth_token=use_auth_token) | |
else: | |
processor = AutoProcessor.from_pretrained(model_name) | |
# Check memory after processor loading | |
after_processor_memory = get_memory_usage() | |
print(f"Memory after processor loading: {after_processor_memory:.2f}MB (Δ: {after_processor_memory - initial_memory:.2f}MB)") | |
# Check if memory is getting too high | |
available_memory = check_available_memory() | |
if available_memory < 4000: # Less than 4GB available | |
print(f"Warning: Only {available_memory:.2f}MB memory available after loading processor") | |
print("Aborting model loading to avoid out-of-memory error") | |
USE_DEMO_MODE = True | |
return None, None | |
# Start loading model with 8-bit quantization | |
print(f"Loading model from {model_name} with 8-bit quantization") | |
if use_auth_token: | |
model = AutoModelForVision2Seq.from_pretrained( | |
model_name, | |
use_auth_token=use_auth_token, | |
load_in_8bit=True, | |
device_map="auto" | |
) | |
else: | |
model = AutoModelForVision2Seq.from_pretrained( | |
model_name, | |
load_in_8bit=True, | |
device_map="auto" | |
) | |
# Check memory after model loading | |
after_model_memory = get_memory_usage() | |
print(f"Memory after model loading: {after_model_memory:.2f}MB (Δ: {after_model_memory - after_processor_memory:.2f}MB)") | |
# Set model to evaluation mode | |
model.eval() | |
return processor, model | |
except Exception as e: | |
print(f"Error during monitored model loading: {str(e)}") | |
USE_DEMO_MODE = True | |
return None, None | |
def is_running_in_space(): | |
"""Check if we're running in a Hugging Face Space environment""" | |
return os.environ.get("SPACE_ID") is not None | |
def is_container_environment(): | |
"""Check if we're running in a container environment""" | |
return os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv") | |
def is_cpu_only(): | |
"""Check if we're running in a CPU-only environment""" | |
return not torch.cuda.is_available() | |
def is_low_memory_environment(): | |
"""Check if we're running in a low-memory environment""" | |
available_memory = check_available_memory() | |
return available_memory < 8000 # Less than 8GB available | |
def is_development_environment(): | |
"""Check if we're running in a development environment""" | |
return not (is_running_in_space() or is_container_environment()) | |
def is_debug_mode(): | |
"""Check if we're running in debug mode""" | |
return os.environ.get("DEBUG", "").lower() in ("1", "true", "yes") | |
def is_test_mode(): | |
"""Check if we're running in test mode""" | |
return os.environ.get("TEST", "").lower() in ("1", "true", "yes") | |
def is_low_memory_container(): | |
"""Check if we're running in a container with memory limits""" | |
if not is_container_environment(): | |
return False | |
# Check if cgroup memory limit is set | |
try: | |
with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f: | |
limit = int(f.read().strip()) | |
# Convert to MB | |
limit_mb = limit / (1024 * 1024) | |
print(f"Container memory limit: {limit_mb:.2f}MB") | |
return limit_mb < 20000 # Less than 20GB | |
except: | |
# If we can't read the limit, assume it's a low-memory container | |
return True | |
def is_space_hardware_type(hardware_type): | |
"""Check if we're running in a Hugging Face Space with a specific hardware type""" | |
if not is_running_in_space(): | |
return False | |
# Check if SPACE_HARDWARE environment variable matches the specified type | |
return os.environ.get("SPACE_HARDWARE", "").lower() == hardware_type.lower() | |
def get_space_hardware_tier(): | |
"""Get the hardware tier of the Hugging Face Space""" | |
if not is_running_in_space(): | |
return "Not a Space" | |
hardware = os.environ.get("SPACE_HARDWARE", "unknown") | |
# Determine the tier based on hardware type | |
if hardware.lower() == "cpu": | |
return "Basic (CPU)" | |
elif hardware.lower() == "t4-small": | |
return "Basic (GPU)" | |
elif hardware.lower() == "t4-medium": | |
return "Standard" | |
elif hardware.lower() == "a10g-small": | |
return "Pro" | |
elif hardware.lower() == "a10g-large": | |
return "Pro+" | |
elif hardware.lower() == "a100-large": | |
return "Enterprise" | |
else: | |
return f"Unknown ({hardware})" | |
def get_space_hardware_memory(): | |
"""Get the memory size of the Hugging Face Space hardware in GB""" | |
if not is_running_in_space(): | |
return 0 | |
hardware = os.environ.get("SPACE_HARDWARE", "unknown").lower() | |
# Determine the memory size based on hardware type | |
if hardware == "cpu": | |
return 16 # 16GB for CPU | |
elif hardware == "t4-small": | |
return 16 # 16GB for T4 Small | |
elif hardware == "t4-medium": | |
return 16 # 16GB for T4 Medium | |
elif hardware == "a10g-small": | |
return 24 # 24GB for A10G Small | |
elif hardware == "a10g-large": | |
return 40 # 40GB for A10G Large | |
elif hardware == "a100-large": | |
return 80 # 80GB for A100 Large | |
else: | |
return 16 # Default to 16GB | |
def get_total_system_memory(): | |
"""Get total system memory in MB""" | |
try: | |
import psutil | |
total_bytes = psutil.virtual_memory().total | |
total_mb = total_bytes / (1024 * 1024) | |
return total_mb | |
except Exception as e: | |
print(f"Error getting total system memory: {str(e)}") | |
return 0 | |
def estimate_model_memory_requirements(): | |
"""Estimate the memory requirements for the model""" | |
# This is a placeholder implementation. You might want to implement a more accurate estimation based on your model's architecture and typical input sizes. | |
try: | |
HF_TOCKEN = os.getenv("HF_TOCKEN") | |
# Print startup message | |
print("===== Application Startup at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "=====") | |
# Get system memory information | |
total_memory = get_total_system_memory() | |
required_memory = estimate_model_memory_requirements() | |
recommended_tier = get_recommended_space_tier() | |
print(f"NOTICE: Total system memory: {total_memory:.2f}MB") | |
print(f"NOTICE: Estimated model memory requirement: {required_memory:.2f}MB") | |
print(f"NOTICE: Recommended Space tier: {recommended_tier}") | |
if is_test_mode(): | |
print("NOTICE: Running in TEST mode") | |
print("NOTICE: Using mock data and responses") | |
USE_DEMO_MODE = True | |
if is_debug_mode(): | |
print("NOTICE: Running in DEBUG mode") | |
print("NOTICE: Additional logging and diagnostics will be enabled") | |
if is_development_environment(): | |
print("NOTICE: Running in development environment") | |
print("NOTICE: Full model capabilities may be available depending on system resources") | |
if is_running_in_space(): | |
print("NOTICE: Running in Hugging Face Space environment") | |
# Check Space hardware type | |
hardware_type = get_space_hardware_type() | |
hardware_tier = get_space_hardware_tier() | |
hardware_memory = get_space_hardware_memory() | |
print(f"NOTICE: Space hardware type: {hardware_type} (Tier: {hardware_tier}, Memory: {hardware_memory}GB)") | |
if has_enough_memory_for_model(): | |
print("NOTICE: This Space has enough memory for the model, but we're still forcing demo mode for stability") | |
else: | |
print(f"NOTICE: This Space does NOT have enough memory for the model (Need: {required_memory:.2f}MB, Have: {hardware_memory*1024:.2f}MB)") | |
print(f"NOTICE: Recommended Space tier: {recommended_tier}") | |
print("NOTICE: FORCING DEMO MODE to avoid 'Memory limit exceeded (16Gi)' error") | |
print("NOTICE: The PaliGemma model is too large for the 16GB memory limit in Spaces") | |
print("NOTICE: To use the full model, please run this application locally") | |
USE_DEMO_MODE = True | |
elif is_container_environment(): | |
print("NOTICE: Running in a container environment") | |
print("NOTICE: Memory limits may be enforced by the container runtime") | |
if is_cpu_only(): | |
print("NOTICE: Running in CPU-only environment") | |
print("NOTICE: Model loading and inference will be slower") | |
# Check available memory | |
available_memory = check_available_memory() | |
print(f"NOTICE: Available memory: {available_memory:.2f}MB") | |
if is_low_memory_environment() and not USE_DEMO_MODE: | |
print("NOTICE: Running in a low-memory environment") | |
print("NOTICE: Enabling DEMO MODE to avoid memory issues") | |
USE_DEMO_MODE = True | |
else: | |
# Check available memory before loading | |
available_memory = check_available_memory() | |
if available_memory < 8000: # If less than 8GB available | |
print(f"Warning: Only {available_memory:.2f}MB memory available, which may not be enough for the full model") | |
return required_memory | |
except Exception as e: | |
print(f"Warning: Model initialization failed: {str(e)}") | |
print("Falling back to demo mode.") | |
USE_DEMO_MODE = True | |
return 0 | |
def initialize_model(): | |
""" | |
仅在需要时初始化模型,不会在应用启动时自动加载 | |
""" | |
global model, USE_DEMO_MODE, MODEL_INIT_ATTEMPTED | |
# 如果已经初始化过模型,直接返回 | |
if model is not None: | |
return model | |
# 如果已经尝试过初始化并失败,使用演示模式 | |
if MODEL_INIT_ATTEMPTED and model is None: | |
logger.info("已尝试过初始化模型但失败,使用演示模式") | |
USE_DEMO_MODE = True | |
return None | |
# 标记为已尝试初始化 | |
MODEL_INIT_ATTEMPTED = True | |
# 检查是否在Hugging Face Space环境中运行 | |
if is_running_in_space(): | |
logger.info("在Hugging Face Space环境中运行") | |
# 检查可用内存 | |
available_memory = check_available_memory() | |
logger.info(f"可用内存: {available_memory:.2f}MB") | |
if available_memory < 8000: # 如果可用内存少于8GB | |
logger.warning(f"只有{available_memory:.2f}MB可用内存,可能不足以加载模型") | |
logger.info("使用演示模式以避免内存问题") | |
USE_DEMO_MODE = True | |
return None | |
if USE_DEMO_MODE: | |
logger.info("使用演示模式 - 不会加载模型") | |
return None # 在演示模式下使用模拟数据 | |
try: | |
# 从环境变量获取token | |
hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HF_TOCKEN") | |
logger.info(f"尝试加载模型 {MODEL_NAME}") | |
model = RadarDetectionModel(model_name=MODEL_NAME, use_auth_token=hf_token) | |
logger.info(f"成功加载模型 {MODEL_NAME}") | |
return model | |
except Exception as e: | |
logger.error(f"模型初始化错误: {str(e)}") | |
logger.info("由于模型加载错误,切换到演示模式") | |
USE_DEMO_MODE = True | |
return None | |
def create_confidence_chart(scores, labels): | |
"""Create a bar chart for confidence scores""" | |
if not scores or not labels: | |
return None | |
df = pd.DataFrame({ | |
'Label': labels, | |
'Confidence': [score * 100 for score in scores] | |
}) | |
fig = px.bar( | |
df, | |
x='Label', | |
y='Confidence', | |
title='Detection Confidence Scores', | |
labels={'Confidence': 'Confidence (%)'}, | |
color='Confidence', | |
color_continuous_scale='viridis' | |
) | |
fig.update_layout( | |
xaxis_title='Detected Object', | |
yaxis_title='Confidence (%)', | |
yaxis_range=[0, 100], | |
template='plotly_white' | |
) | |
return fig | |
def create_feature_radar_chart(features): | |
"""Create a radar chart for feature analysis""" | |
categories = list(features.keys()) | |
values = [] | |
# Convert text classifications to numeric values (1-5 scale) | |
for feature in features.values(): | |
if "High" in feature: | |
values.append(5) | |
elif "Medium-High" in feature: | |
values.append(4) | |
elif "Medium" in feature: | |
values.append(3) | |
elif "Medium-Low" in feature: | |
values.append(2) | |
elif "Low" in feature: | |
values.append(1) | |
else: | |
values.append(0) | |
fig = go.Figure() | |
fig.add_trace(go.Scatterpolar( | |
r=values, | |
theta=categories, | |
fill='toself', | |
name='Feature Analysis' | |
)) | |
fig.update_layout( | |
polar=dict( | |
radialaxis=dict( | |
visible=True, | |
range=[0, 5] | |
) | |
), | |
title='Feature Analysis Radar Chart', | |
template='plotly_white' | |
) | |
return fig | |
def create_heatmap(image_array): | |
"""Create a heatmap visualization of the image intensity""" | |
if image_array is None: | |
return None | |
# Convert to grayscale if needed | |
if len(image_array.shape) == 3 and image_array.shape[2] == 3: | |
gray_img = np.mean(image_array, axis=2) | |
else: | |
gray_img = image_array | |
fig = px.imshow( | |
gray_img, | |
color_continuous_scale='inferno', | |
title='Signal Intensity Heatmap' | |
) | |
fig.update_layout( | |
xaxis_title='X Position', | |
yaxis_title='Y Position', | |
template='plotly_white' | |
) | |
return fig | |
def cleanup_memory(): | |
"""Attempt to clean up memory by forcing garbage collection""" | |
try: | |
import gc | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
print("Memory cleanup performed") | |
except Exception as e: | |
print(f"Error during memory cleanup: {str(e)}") | |
def process_image_streaming(image, generate_tech_report=False, progress=gr.Progress()): | |
"""处理图像并提供流式进度更新""" | |
if image is None: | |
raise gr.Error("请上传一张图像。") | |
# 仅在需要时初始化模型 | |
progress(0.1, desc="初始化模型...") | |
log_memory_usage("在process_image中初始化模型之前") | |
global model, USE_DEMO_MODE | |
if not USE_DEMO_MODE: | |
model = initialize_model() | |
if model is None: | |
progress(0.15, desc="切换到演示模式...") | |
USE_DEMO_MODE = True | |
try: | |
# 如果需要,将图像转换为PIL Image | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
# 运行检测 | |
progress(0.2, desc="运行检测...") | |
log_memory_usage("检测之前") | |
if USE_DEMO_MODE: | |
# 在演示模式下使用模拟检测结果 | |
detection_result = { | |
'boxes': [[100, 100, 200, 200], [300, 300, 400, 400]], | |
'scores': [0.92, 0.85], | |
'labels': ['裂缝', '腐蚀'], | |
'image': image | |
} | |
else: | |
try: | |
detection_result = model.detect(image) | |
log_memory_usage("检测之后") | |
except Exception as e: | |
logger.error(f"检测过程中出错: {str(e)}") | |
# 如果检测失败,切换到演示模式 | |
USE_DEMO_MODE = True | |
detection_result = { | |
'boxes': [[100, 100, 200, 200], [300, 300, 400, 400]], | |
'scores': [0.92, 0.85], | |
'labels': ['错误', '备用'], | |
'image': image | |
} | |
# 提取特征 | |
progress(0.3, desc="提取特征...") | |
features = extract_features(image, detection_result) | |
# 创建可视化图表 | |
progress(0.5, desc="创建可视化...") | |
confidence_chart = create_confidence_chart( | |
detection_result.get('scores', []), | |
detection_result.get('labels', []) | |
) | |
feature_chart = create_feature_radar_chart(features) | |
heatmap = create_heatmap(np.array(image)) | |
# 开始性能跟踪 | |
progress(0.6, desc="分析性能...") | |
start_time = time.time() | |
performance_data = { | |
'pipeline_stats': {}, | |
'peak_memory': 0, | |
'gpu_util': 0 | |
} | |
# 处理图像并获取结果 | |
stage_start = time.time() | |
detection_results = detection_result | |
detection_results['processing_time'] = (time.time() - stage_start) * 1000 | |
performance_data['pipeline_stats']['detection'] = { | |
'time': detection_results['processing_time'], | |
'memory': get_memory_usage() | |
} | |
# 提取特征并分析 | |
stage_start = time.time() | |
model_outputs = { | |
'feature_quality': 0.85, | |
'encoding_latency': 120.5, | |
'feature_dimensions': '768x768', | |
'text_confidence': 0.92, | |
'decoding_latency': 85.3, | |
'token_rate': 45.7 | |
} | |
performance_data['pipeline_stats']['feature_extraction'] = { | |
'time': (time.time() - stage_start) * 1000, | |
'memory': get_memory_usage() | |
} | |
# 执行多模态分析 | |
stage_start = time.time() | |
multimodal_results = { | |
'alignment_score': 0.78, | |
'coherence_score': 0.82, | |
'feature_correlation': 0.75 | |
} | |
performance_data['pipeline_stats']['multimodal_analysis'] = { | |
'time': (time.time() - stage_start) * 1000, | |
'memory': get_memory_usage() | |
} | |
# 更新性能数据 | |
performance_data['total_time'] = (time.time() - start_time) * 1000 | |
performance_data['peak_memory'] = get_peak_memory_usage() | |
performance_data['gpu_util'] = get_gpu_utilization() | |
# 生成分析报告 | |
progress(0.8, desc="生成报告...") | |
analysis_report = generate_report(detection_result, features) | |
# 准备输出 | |
output_image = plot_detection(image, detection_result) | |
if generate_tech_report: | |
# 准备技术报告的数据 | |
tech_report_data = { | |
'model_outputs': model_outputs, | |
'detection_results': detection_results, | |
'multimodal_results': multimodal_results, | |
'performance_data': performance_data | |
} | |
# 生成技术报告 | |
tech_report = TechnicalReportGenerator().generate_report(tech_report_data) | |
# 将技术报告保存到临时文件 | |
report_path = "temp_tech_report.md" | |
with open(report_path, "w") as f: | |
f.write(tech_report) | |
progress(1.0, desc="分析完成!") | |
# 处理完成后清理内存 | |
cleanup_memory() | |
return output_image, analysis_report, report_path, confidence_chart, feature_chart, heatmap | |
progress(1.0, desc="分析完成!") | |
# 处理完成后清理内存 | |
cleanup_memory() | |
return output_image, analysis_report, None, confidence_chart, feature_chart, heatmap | |
except Exception as e: | |
error_msg = f"处理图像时出错: {str(e)}" | |
print(error_msg) | |
# 出错后清理内存 | |
cleanup_memory() | |
raise gr.Error(error_msg) | |
def display_history(): | |
try: | |
reports = get_report_history() | |
history_html = "<div class='history-container'><h3>Analysis History</h3>" | |
for report in reports: | |
history_html += f""" | |
<div class='history-item'> | |
<p><strong>Report ID:</strong> {report.report_id}</p> | |
<p><strong>Defect Type:</strong> {report.defect_type}</p> | |
<p><strong>Description:</strong> {report.description}</p> | |
<p><strong>Created:</strong> {report.created_at}</p> | |
</div> | |
""" | |
history_html += "</div>" | |
return history_html | |
except Exception as e: | |
raise gr.Error(f"Error retrieving history: {str(e)}") | |
def get_memory_usage(): | |
"""Get current memory usage in MB""" | |
process = psutil.Process() | |
memory_info = process.memory_info() | |
return memory_info.rss / 1024 / 1024 | |
def get_peak_memory_usage(): | |
"""Get peak memory usage in MB""" | |
try: | |
process = psutil.Process() | |
memory_info = process.memory_info() | |
if hasattr(memory_info, 'peak_wset'): | |
return memory_info.peak_wset / 1024 / 1024 | |
else: | |
# On Linux, we can use /proc/self/status to get peak memory | |
with open('/proc/self/status') as f: | |
for line in f: | |
if line.startswith('VmHWM:'): | |
return float(line.split()[1]) / 1024 # Convert KB to MB | |
except: | |
pass | |
return 0 | |
def get_gpu_utilization(): | |
"""Get GPU utilization percentage""" | |
try: | |
if torch.cuda.is_available(): | |
return torch.cuda.utilization() if hasattr(torch.cuda, 'utilization') else 0 | |
except: | |
pass | |
return 0 | |
def log_memory_usage(stage=""): | |
"""Log current memory usage""" | |
mem_usage = get_memory_usage() | |
peak_mem = get_peak_memory_usage() | |
gpu_util = get_gpu_utilization() | |
print(f"Memory usage at {stage}: {mem_usage:.2f}MB (Peak: {peak_mem:.2f}MB, GPU: {gpu_util:.2f}%)") | |
def toggle_dark_mode(): | |
"""Toggle between light and dark themes""" | |
global DARK_MODE | |
DARK_MODE = not DARK_MODE | |
return gr.Theme.darkmode() if DARK_MODE else THEME | |
def get_space_upgrade_url(): | |
"""Get the URL for upgrading the Space""" | |
if not is_running_in_space(): | |
return "#" | |
space_id = os.environ.get("SPACE_ID", "") | |
if not space_id: | |
return "https://huggingface.co/pricing" | |
# Extract username and space name | |
parts = space_id.split("/") | |
if len(parts) != 2: | |
return "https://huggingface.co/pricing" | |
username, space_name = parts | |
return f"https://huggingface.co/spaces/{username}/{space_name}/settings" | |
def get_local_installation_instructions(): | |
"""Get instructions for running the app locally""" | |
required_memory = estimate_model_memory_requirements() | |
repo_url = get_repository_url() | |
return f""" | |
## Running Locally | |
To run this application locally with the full model: | |
1. Clone the repository: | |
```bash | |
git clone {repo_url} | |
cd radar-analysis | |
``` | |
2. Install dependencies: | |
```bash | |
pip install -r requirements.txt | |
``` | |
3. Set your Hugging Face token as an environment variable: | |
```bash | |
export HF_TOCKEN=your_huggingface_token | |
``` | |
4. Run the application: | |
```bash | |
python app.py | |
``` | |
Make sure your system has at least {required_memory/1024:.1f}GB of RAM for optimal performance. | |
""" | |
def get_model_card_url(): | |
"""Get the URL for the model card""" | |
return f"https://huggingface.co/{MODEL_NAME}" | |
def has_enough_memory_for_model(): | |
"""Check if we have enough memory for the model""" | |
if is_running_in_space(): | |
# In Spaces, we need to be more cautious | |
hardware_memory = get_space_hardware_memory() * 1024 # Convert GB to MB | |
required_memory = estimate_model_memory_requirements() | |
print(f"Space hardware memory: {hardware_memory}MB, Required: {required_memory:.2f}MB") | |
return hardware_memory >= required_memory | |
else: | |
# For local development, check available memory | |
available_memory = check_available_memory() | |
required_memory = estimate_model_memory_requirements() | |
print(f"Available memory: {available_memory:.2f}MB, Required: {required_memory:.2f}MB") | |
return available_memory >= required_memory | |
def get_repository_url(): | |
"""Get the URL for the repository""" | |
if is_running_in_space(): | |
space_id = os.environ.get("SPACE_ID", "") | |
if space_id: | |
# Space ID is in the format "username/spacename" | |
return f"https://huggingface.co/spaces/{space_id}" | |
else: | |
return "https://huggingface.co/spaces/xingqiang/radar-analysis" | |
else: | |
return "https://huggingface.co/spaces/xingqiang/radar-analysis" | |
def get_directory_name_from_repo_url(repo_url): | |
"""Get the directory name from the repository URL""" | |
# Extract the last part of the URL | |
parts = repo_url.rstrip('/').split('/') | |
return parts[-1] | |
# Launch the interface | |
def launch(): | |
"""启动Gradio界面""" | |
if is_running_in_space(): | |
# 在Spaces中,使用最小资源配置以避免内存问题 | |
logger.info("使用最小资源配置启动Spaces") | |
iface.launch( | |
share=False, | |
server_name="0.0.0.0", | |
server_port=7860, | |
max_threads=4, # 从10减少到4 | |
show_error=True, | |
quiet=False | |
) | |
else: | |
# 对于本地开发,使用默认设置 | |
iface.launch() | |
# Create Gradio interface | |
with gr.Blocks(theme=THEME) as iface: | |
theme_state = gr.State(THEME) | |
with gr.Row(): | |
gr.Markdown("# 雷达图像分析系统") | |
dark_mode_btn = gr.Button("🌓 切换暗黑模式", scale=0) | |
# 添加模型加载提示 | |
gr.Markdown(""" | |
### ℹ️ 模型加载说明 | |
- 模型仅在您点击"分析"按钮时才会下载和初始化 | |
- 首次分析可能需要较长时间,因为需要下载模型 | |
- 如果内存不足,系统会自动切换到演示模式 | |
""", elem_id="model-loading-notice") | |
if USE_DEMO_MODE: | |
hardware_type = get_space_hardware_type() if is_running_in_space() else "N/A" | |
hardware_tier = get_space_hardware_tier() if is_running_in_space() else "N/A" | |
hardware_memory = get_space_hardware_memory() if is_running_in_space() else 0 | |
total_memory = get_total_system_memory() | |
required_memory = estimate_model_memory_requirements() | |
recommended_tier = get_recommended_space_tier() | |
upgrade_url = get_space_upgrade_url() | |
model_card_url = get_model_card_url() | |
memory_info = f"Space硬件: {hardware_type} (等级: {hardware_tier}, 内存: {hardware_memory}GB)" | |
model_req = f"[PaliGemma模型]({model_card_url})在使用8位量化加载时需要约{required_memory/1024:.1f}GB内存" | |
gr.Markdown(f""" | |
### ⚠️ 运行在演示模式 | |
由于内存限制,应用程序当前在演示模式下运行: | |
1. **内存错误**: Space遇到"内存限制超过(16Gi)"错误 | |
- {memory_info} | |
- 系统总内存: {total_memory:.2f}MB | |
- {model_req} | |
2. **解决方案**: | |
- 演示模式提供模拟结果用于演示目的 | |
- 要使用完整模型,请在本地运行此应用程序,需要{required_memory/1024:.1f}GB+内存 | |
- 或[升级到{recommended_tier} Space等级]({upgrade_url})或更高 | |
演示模式仍提供所有UI功能和可视化特性。 | |
""", elem_id="demo-mode-warning") | |
gr.Markdown("上传雷达图像以分析缺陷并生成技术报告") | |
with gr.Tabs() as tabs: | |
with gr.TabItem("分析", id="analysis"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Accordion("输入", open=True): | |
input_image = gr.Image( | |
type="pil", | |
label="上传雷达图像", | |
elem_id="input-image", | |
sources=["upload", "webcam", "clipboard"], | |
tool="editor" | |
) | |
tech_report_checkbox = gr.Checkbox( | |
label="生成技术报告", | |
value=False, | |
info="创建详细的技术分析报告" | |
) | |
analyze_button = gr.Button( | |
"分析", | |
variant="primary", | |
elem_id="analyze-btn" | |
) | |
with gr.Column(scale=2): | |
with gr.Accordion("检测结果", open=True): | |
output_image = gr.Image( | |
type="pil", | |
label="检测结果", | |
elem_id="output-image" | |
) | |
with gr.Accordion("分析报告", open=True): | |
output_report = gr.HTML( | |
label="分析报告", | |
elem_id="analysis-report" | |
) | |
tech_report_output = gr.File( | |
label="技术报告", | |
elem_id="tech-report" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
confidence_plot = gr.Plot( | |
label="置信度分数", | |
elem_id="confidence-plot" | |
) | |
with gr.Column(): | |
feature_plot = gr.Plot( | |
label="特征分析", | |
elem_id="feature-plot" | |
) | |
with gr.Row(): | |
heatmap_plot = gr.Plot( | |
label="信号强度热图", | |
elem_id="heatmap-plot" | |
) | |
with gr.TabItem("历史", id="history"): | |
with gr.Row(): | |
history_button = gr.Button("刷新历史") | |
history_output = gr.HTML(elem_id="history-output") | |
with gr.TabItem("帮助", id="help"): | |
gr.Markdown(""" | |
## 如何使用此工具 | |
1. **上传图像**: 点击上传按钮选择要分析的雷达图像 | |
2. **生成技术报告** (可选): 如果需要详细的技术报告,请勾选此框 | |
3. **分析**: 点击分析按钮处理图像 | |
4. **查看结果**: | |
- 检测可视化显示已识别的缺陷 | |
- 分析报告提供发现的摘要 | |
- 技术报告(如果请求)提供详细指标 | |
- 图表提供置信度分数和特征分析的可视化表示 | |
## 关于模型 | |
该系统使用[PaliGemma]({get_model_card_url()}),这是一个视觉-语言模型,结合了SigLIP-So400m(图像编码器)和Gemma-2B(文本解码器)进行联合目标检测和多模态分析。 | |
该模型针对雷达图像分析进行了微调,可以检测结构检查图像中的各种类型的缺陷和异常。 | |
""") | |
if USE_DEMO_MODE and is_running_in_space(): | |
gr.Markdown(get_local_installation_instructions()) | |
gr.Markdown(""" | |
## 键盘快捷键 | |
- **Ctrl+A**: 触发分析 | |
- **Ctrl+D**: 切换暗黑模式 | |
## 故障排除 | |
- 如果分析失败,请尝试上传不同的图像格式 | |
- 确保图像是有效的雷达扫描 | |
- 对于技术问题,请查看控制台日志 | |
""") | |
# Set up event handlers | |
dark_mode_btn.click( | |
fn=toggle_dark_mode, | |
inputs=[], | |
outputs=[iface], | |
api_name="toggle_theme" | |
) | |
analyze_button.click( | |
fn=process_image_streaming, | |
inputs=[input_image, tech_report_checkbox], | |
outputs=[output_image, output_report, tech_report_output, confidence_plot, feature_plot, heatmap_plot], | |
api_name="analyze" | |
) | |
history_button.click( | |
fn=display_history, | |
inputs=[], | |
outputs=[history_output], | |
api_name="history" | |
) | |
# Add keyboard shortcuts | |
iface.load(lambda: None, None, None, _js=""" | |
() => { | |
document.addEventListener('keydown', (e) => { | |
if (e.key === 'a' && e.ctrlKey) { | |
document.getElementById('analyze-btn').click(); | |
} | |
if (e.key === 'd' && e.ctrlKey) { | |
document.querySelector('button:contains("切换暗黑模式")').click(); | |
} | |
}); | |
} | |
""") | |
# Launch the interface | |
launch() | |