| import os |
| import json |
| import uuid |
| import shutil |
| from datetime import datetime |
| from typing import Dict, List, Optional, Any |
| from pathlib import Path |
| from pydantic import BaseModel |
| from enum import Enum |
|
|
| class ProcessingType(str, Enum): |
| TRANSCRIPT = "transcript" |
| CROP = "crop" |
| EFFECTS = "effects" |
| AUDIO = "audio" |
| COMBINED = "combined" |
|
|
| class VersionStatus(str, Enum): |
| PENDING = "pending" |
| PROCESSING = "processing" |
| COMPLETED = "completed" |
| FAILED = "failed" |
|
|
| class VideoVersion(BaseModel): |
| version_id: str |
| original_id: str |
| version_name: str |
| processing_type: ProcessingType |
| file_path: str |
| file_size: int |
| duration: float |
| resolution: str |
| created_at: datetime |
| parent_version: Optional[str] = None |
| processing_config: Dict[str, Any] |
| status: VersionStatus |
| metadata: Dict[str, Any] = {} |
|
|
| class OriginalVideo(BaseModel): |
| original_id: str |
| file_name: str |
| file_path: str |
| file_size: int |
| upload_date: datetime |
| duration: float |
| resolution: str |
| format: str |
| metadata: Dict[str, Any] = {} |
|
|
| class VersionManager: |
| def __init__(self, base_directory: str = "video_storage"): |
| self.base_dir = Path(base_directory) |
| self.originals_dir = self.base_dir / "originals" |
| self.versions_dir = self.base_dir / "versions" |
| self.registry_file = self.base_dir / "version_registry.json" |
| |
| |
| self.originals_dir.mkdir(parents=True, exist_ok=True) |
| self.versions_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.registry = self._load_registry() |
| |
| def _load_registry(self) -> Dict[str, Any]: |
| """تحميل سجل النسخ من الملف""" |
| if self.registry_file.exists(): |
| with open(self.registry_file, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| return {"originals": {}, "versions": {}, "version_tree": {}} |
| |
| def _save_registry(self): |
| """حفظ سجل النسخ في الملف""" |
| with open(self.registry_file, 'w', encoding='utf-8') as f: |
| json.dump(self.registry, f, indent=2, default=str) |
| |
| def register_original(self, source_path: str, file_name: str, metadata: Dict = None) -> str: |
| """تسجيل الفيديو الأصلي (محمي من التعديل)""" |
| original_id = str(uuid.uuid4()) |
| |
| |
| original_dir = self.originals_dir / original_id |
| original_dir.mkdir(exist_ok=True) |
| |
| |
| original_path = original_dir / file_name |
| shutil.copy2(source_path, original_path) |
| |
| |
| video_info = self._get_video_info(original_path) |
| |
| original_video = OriginalVideo( |
| original_id=original_id, |
| file_name=file_name, |
| file_path=str(original_path), |
| file_size=original_path.stat().st_size, |
| upload_date=datetime.now(), |
| duration=video_info.get('duration', 0), |
| resolution=video_info.get('resolution', 'unknown'), |
| format=video_info.get('format', 'unknown'), |
| metadata=metadata or {} |
| ) |
| |
| |
| self.registry["originals"][original_id] = original_video.dict() |
| self.registry["version_tree"][original_id] = [] |
| self._save_registry() |
| |
| return original_id |
| |
| def create_version(self, original_id: str, version_name: str, |
| processing_type: ProcessingType, |
| processing_config: Dict[str, Any], |
| parent_version: Optional[str] = None) -> str: |
| """إنشاء نسخة جديدة من الفيديو الأصلي أو نسخة موجودة""" |
| |
| |
| if original_id not in self.registry["originals"]: |
| raise ValueError(f"Original video {original_id} not found") |
| |
| version_id = str(uuid.uuid4()) |
| |
| |
| version_dir = self.versions_dir / version_id |
| version_dir.mkdir(exist_ok=True) |
| |
| |
| if parent_version: |
| if parent_version not in self.registry["versions"]: |
| raise ValueError(f"Parent version {parent_version} not found") |
| source_path = self.registry["versions"][parent_version]["file_path"] |
| else: |
| source_path = self.registry["originals"][original_id]["file_path"] |
| |
| |
| original_name = self.registry["originals"][original_id]["file_name"] |
| version_path = version_dir / f"{version_name}_{original_name}" |
| |
| |
| shutil.copy2(source_path, version_path) |
| |
| |
| video_info = self._get_video_info(version_path) |
| |
| video_version = VideoVersion( |
| version_id=version_id, |
| original_id=original_id, |
| version_name=version_name, |
| processing_type=processing_type, |
| file_path=str(version_path), |
| file_size=version_path.stat().st_size, |
| duration=video_info.get('duration', 0), |
| resolution=video_info.get('resolution', 'unknown'), |
| created_at=datetime.now(), |
| parent_version=parent_version, |
| processing_config=processing_config, |
| status=VersionStatus.PENDING |
| ) |
| |
| |
| self.registry["versions"][version_id] = video_version.dict() |
| |
| |
| if parent_version: |
| self.registry["version_tree"][parent_version] = self.registry["version_tree"].get(parent_version, []) + [version_id] |
| else: |
| self.registry["version_tree"][original_id] = self.registry["version_tree"].get(original_id, []) + [version_id] |
| |
| self._save_registry() |
| |
| return version_id |
| |
| def get_original(self, original_id: str) -> Optional[OriginalVideo]: |
| """الحصول على معلومات الفيديو الأصلي""" |
| if original_id in self.registry["originals"]: |
| return OriginalVideo(**self.registry["originals"][original_id]) |
| return None |
| |
| def get_version(self, version_id: str) -> Optional[VideoVersion]: |
| """الحصول على معلومات نسخة معينة""" |
| if version_id in self.registry["versions"]: |
| return VideoVersion(**self.registry["versions"][version_id]) |
| return None |
| |
| def get_version_tree(self, original_id: str) -> Dict[str, List[str]]: |
| """الحصول على شجرة النسخ لفيديو أصلي""" |
| return self.registry["version_tree"].get(original_id, []) |
| |
| def get_all_versions(self, original_id: str) -> List[VideoVersion]: |
| """الحصول على جميع النسخ المرتبطة بفيديو أصلي""" |
| versions = [] |
| for version_id in self.registry["versions"]: |
| version_data = self.registry["versions"][version_id] |
| if version_data["original_id"] == original_id: |
| versions.append(VideoVersion(**version_data)) |
| return sorted(versions, key=lambda x: x.created_at) |
| |
| def update_version_status(self, version_id: str, status: VersionStatus): |
| """تحديث حالة النسخة""" |
| if version_id in self.registry["versions"]: |
| self.registry["versions"][version_id]["status"] = status |
| self._save_registry() |
| |
| def delete_version(self, version_id: str) -> bool: |
| """حذف نسخة (مع الملفات)""" |
| if version_id not in self.registry["versions"]: |
| return False |
| |
| version_data = self.registry["versions"][version_id] |
| |
| |
| try: |
| Path(version_data["file_path"]).unlink() |
| Path(version_data["file_path"]).parent.rmdir() |
| except Exception as e: |
| print(f"Error deleting version files: {e}") |
| |
| |
| del self.registry["versions"][version_id] |
| |
| |
| for original_id, versions in self.registry["version_tree"].items(): |
| if version_id in versions: |
| versions.remove(version_id) |
| break |
| |
| self._save_registry() |
| return True |
| |
| def get_original_path(self, original_id: str) -> Optional[str]: |
| """الحصول على مسار الفيديو الأصلي (للقراءة فقط)""" |
| if original_id in self.registry["originals"]: |
| return self.registry["originals"][original_id]["file_path"] |
| return None |
| |
| def _get_video_info(self, video_path: Path) -> Dict[str, Any]: |
| """الحصول على معلومات الفيديو (مدة، دقة، إلخ)""" |
| try: |
| |
| |
| return { |
| 'duration': 0, |
| 'resolution': '1920x1080', |
| 'format': 'mp4' |
| } |
| except Exception as e: |
| print(f"Error getting video info: {e}") |
| return { |
| 'duration': 0, |
| 'resolution': 'unknown', |
| 'format': 'unknown' |
| } |
| |
| def cleanup_orphaned_versions(self): |
| """تنظيف النسخ الميتة (ملفات بدون سجل)""" |
| |
| pass |
| |
| def get_storage_stats(self) -> Dict[str, Any]: |
| """الحصول على إحصائيات التخزين""" |
| originals_size = sum( |
| Path(self.registry["originals"][oid]["file_path"]).stat().st_size |
| for oid in self.registry["originals"] |
| ) |
| |
| versions_size = sum( |
| Path(self.registry["versions"][vid]["file_path"]).stat().st_size |
| for vid in self.registry["versions"] |
| ) |
| |
| return { |
| "original_videos": len(self.registry["originals"]), |
| "total_versions": len(self.registry["versions"]), |
| "originals_size": originals_size, |
| "versions_size": versions_size, |
| "total_size": originals_size + versions_size |
| } |