| """ |
| Export Service for Creative Breakthrough |
| Handles bulk export of ad creatives with images and Excel data |
| """ |
|
|
| import os |
| import zipfile |
| import tempfile |
| import shutil |
| import re |
| from datetime import datetime |
| from typing import List, Dict, Any, Optional |
| import httpx |
| from openpyxl import Workbook |
| from openpyxl.styles import Font, Alignment |
| import logging |
|
|
| from config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ExportService: |
| """Service for exporting ad creatives in bulk.""" |
| |
| def __init__(self): |
| self.temp_dir = None |
| |
| def sanitize_filename(self, text: str, max_length: int = 50) -> str: |
| """ |
| Sanitize text for use in filename. |
| Remove special characters and limit length. |
| """ |
| if not text: |
| return "unknown" |
| |
| |
| text = text.lower() |
| |
| |
| text = re.sub(r'[^a-z0-9]+', '_', text) |
| |
| |
| text = text.strip('_') |
| |
| |
| if len(text) > max_length: |
| text = text[:max_length] |
| |
| |
| text = text.rstrip('_') |
| |
| return text or "unknown" |
| |
| def generate_image_filename( |
| self, |
| ad: Dict[str, Any], |
| version: int, |
| date_str: str |
| ) -> str: |
| """ |
| Generate filename using nomenclature: |
| {niche}_{concept}_{angle}_{date}_{version}.png |
| |
| Example: home_insurance_before_after_fear_20260130_001.png |
| """ |
| |
| niche = self.sanitize_filename(ad.get("niche", "standard"), max_length=20) |
| |
| |
| concept = self.sanitize_filename( |
| ad.get("concept_name") or ad.get("concept_key") or "standard", |
| max_length=20 |
| ) |
| |
| |
| angle = self.sanitize_filename( |
| ad.get("angle_name") or |
| ad.get("angle_key") or |
| ad.get("psychological_angle") or |
| "standard", |
| max_length=20 |
| ) |
| |
| |
| version_str = f"{version:03d}" |
| |
| |
| filename = f"{niche}_{concept}_{angle}_{date_str}_{version_str}.png" |
| |
| return filename |
| |
| async def download_image(self, image_url: str) -> Optional[bytes]: |
| """Download image from URL and return bytes.""" |
| try: |
| |
| if not image_url.startswith(("http://", "https://")): |
| local_path = os.path.join(settings.output_dir, image_url.lstrip("/images/")) |
| if os.path.exists(local_path): |
| with open(local_path, "rb") as f: |
| return f.read() |
| logger.warning(f"Local file not found: {local_path}") |
| return None |
| |
| |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| response = await client.get(image_url) |
| response.raise_for_status() |
| return response.content |
| except Exception as e: |
| logger.error(f"Failed to download image from {image_url}: {e}") |
| return None |
| |
| async def download_and_rename_images( |
| self, |
| ads: List[Dict[str, Any]], |
| output_dir: str |
| ) -> Dict[str, str]: |
| """ |
| Download all images and rename them according to nomenclature. |
| Returns mapping of ad_id -> new_filename. |
| """ |
| filename_map = {} |
| date_str = datetime.now().strftime("%Y%m%d") |
| |
| for idx, ad in enumerate(ads, start=1): |
| ad_id = ad.get("id") |
| |
| |
| image_url = ad.get("r2_url") or ad.get("image_url") |
| |
| if not image_url: |
| logger.warning(f"No image URL for ad {ad_id}, skipping") |
| continue |
| |
| |
| new_filename = self.generate_image_filename(ad, idx, date_str) |
| |
| |
| logger.info(f"Downloading image {idx}/{len(ads)}: {image_url}") |
| image_bytes = await self.download_image(image_url) |
| |
| if not image_bytes: |
| logger.warning(f"Failed to download image for ad {ad_id}, skipping") |
| continue |
| |
| |
| output_path = os.path.join(output_dir, new_filename) |
| with open(output_path, "wb") as f: |
| f.write(image_bytes) |
| |
| filename_map[ad_id] = new_filename |
| logger.info(f"Saved: {new_filename}") |
| |
| return filename_map |
| |
| def create_excel_sheet( |
| self, |
| ads: List[Dict[str, Any]], |
| filename_map: Dict[str, str], |
| output_path: str |
| ): |
| """ |
| Create Excel sheet with ad copy data. |
| Columns: Image Filename, Headline, Title, Description, CTA, Psychological Angle |
| """ |
| wb = Workbook() |
| ws = wb.active |
| ws.title = "Ad Copy Data" |
| |
| |
| headers = [ |
| "Image Filename", |
| "Image URL", |
| "Headline", |
| "Title", |
| "Description", |
| "CTA", |
| "Psychological Angle", |
| "Niche", |
| "Created Date" |
| ] |
| |
| |
| for col_idx, header in enumerate(headers, start=1): |
| cell = ws.cell(row=1, column=col_idx, value=header) |
| cell.font = Font(bold=True) |
| cell.alignment = Alignment(horizontal="center", vertical="center") |
| |
| |
| for row_idx, ad in enumerate(ads, start=2): |
| ad_id = ad.get("id") |
| |
| |
| filename = filename_map.get(ad_id, "N/A") |
| |
| |
| image_url = ad.get("r2_url") or ad.get("image_url") or "" |
| |
| |
| row_data = [ |
| filename, |
| image_url, |
| ad.get("headline", ""), |
| ad.get("title", ""), |
| ad.get("description", ""), |
| ad.get("cta", ""), |
| ad.get("psychological_angle", ""), |
| ad.get("niche", ""), |
| ad.get("created_at", "")[:10] if ad.get("created_at") else "" |
| ] |
| |
| |
| for col_idx, value in enumerate(row_data, start=1): |
| ws.cell(row=row_idx, column=col_idx, value=value) |
| |
| |
| for column in ws.columns: |
| max_length = 0 |
| column_letter = column[0].column_letter |
| for cell in column: |
| try: |
| if cell.value: |
| max_length = max(max_length, len(str(cell.value))) |
| except: |
| pass |
| adjusted_width = min(max_length + 2, 50) |
| ws.column_dimensions[column_letter].width = adjusted_width |
| |
| |
| ws.freeze_panes = "A2" |
| |
| |
| wb.save(output_path) |
| logger.info(f"Excel sheet created: {output_path}") |
| |
| async def create_export_package( |
| self, |
| ads: List[Dict[str, Any]] |
| ) -> str: |
| """ |
| Create a complete export package with images and Excel sheet. |
| Returns path to the ZIP file. |
| """ |
| |
| self.temp_dir = tempfile.mkdtemp(prefix="export_") |
| |
| try: |
| |
| creatives_dir = os.path.join(self.temp_dir, "creatives") |
| os.makedirs(creatives_dir, exist_ok=True) |
| |
| |
| logger.info(f"Downloading {len(ads)} images...") |
| filename_map = await self.download_and_rename_images(ads, creatives_dir) |
| |
| if not filename_map: |
| raise Exception("No images were successfully downloaded") |
| |
| |
| excel_path = os.path.join(self.temp_dir, "ad_copy_data.xlsx") |
| logger.info("Creating Excel sheet...") |
| self.create_excel_sheet(ads, filename_map, excel_path) |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| zip_filename = f"creatives_export_{timestamp}.zip" |
| zip_path = os.path.join(tempfile.gettempdir(), zip_filename) |
| |
| logger.info(f"Creating ZIP file: {zip_filename}") |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| |
| for filename in os.listdir(creatives_dir): |
| file_path = os.path.join(creatives_dir, filename) |
| zipf.write(file_path, os.path.join("creatives", filename)) |
| |
| |
| zipf.write(excel_path, "ad_copy_data.xlsx") |
| |
| logger.info(f"Export package created successfully: {zip_path}") |
| return zip_path |
| |
| except Exception as e: |
| logger.error(f"Failed to create export package: {e}") |
| raise |
| finally: |
| |
| if self.temp_dir and os.path.exists(self.temp_dir): |
| try: |
| shutil.rmtree(self.temp_dir) |
| except Exception as e: |
| logger.warning(f"Failed to cleanup temp directory: {e}") |
| |
| def cleanup_zip(self, zip_path: str): |
| """Clean up the ZIP file after it's been sent.""" |
| try: |
| if os.path.exists(zip_path): |
| os.remove(zip_path) |
| logger.info(f"Cleaned up ZIP file: {zip_path}") |
| except Exception as e: |
| logger.warning(f"Failed to cleanup ZIP file: {e}") |
|
|
|
|
| |
| export_service = ExportService() |
|
|