""" Campaign Data Processor for Kickstarter Prediction This module handles the preprocessing of raw Kickstarter campaign data, generating text embeddings and preparing numerical features for prediction. Key functionality: - Longformer embeddings for project descriptions - Sentence transformer embeddings for blurbs and risk statements - GloVe embeddings for categories and countries - Normalization of numerical features Author: Angus Fung Date: April 2025 """ import os # Set gensim data directory to a writable location at the very start os.environ['GENSIM_DATA_DIR'] = '/tmp/gensim-data' try: os.makedirs('/tmp/gensim-data', exist_ok=True) print(f"Created directory at {os.environ['GENSIM_DATA_DIR']}") except Exception as e: print(f"Error creating gensim directory: {str(e)}") import json import numpy as np from typing import Dict, List, Tuple, Any, Optional import torch from transformers import AutoTokenizer, AutoModel import gc import gensim.downloader class CampaignProcessor: """ Processor for Kickstarter campaign data. This class handles the preprocessing of raw campaign data, transforming text and categorical features into embeddings using various NLP models and preparing numerical features for the prediction model. """ def __init__(self, data: List[Dict], lazy_load: bool = False): """ Initialize the CampaignProcessor. Args: data (List[Dict]): List of campaign dictionaries to process lazy_load (bool): If True, models will be loaded only when needed rather than at initialization time """ self.data = data self.categories = sorted(list(set(camp.get('raw_category', '') for camp in self.data))) self.lazy_load = lazy_load # Country name to ISO alpha-2 code mapping # First letter of each word is capitalized in the original data self.country_to_alpha2 = { # Main ISO country names "United States": "US", "United Kingdom": "GB", "Canada": "CA", "Australia": "AU", "New Zealand": "NZ", "Germany": "DE", "France": "FR", "Italy": "IT", "Spain": "ES", "Netherlands": "NL", "Sweden": "SE", "Denmark": "DK", "Norway": "NO", "Ireland": "IE", "Switzerland": "CH", "Austria": "AT", "Belgium": "BE", "Luxembourg": "LU", "Hong Kong": "HK", "Singapore": "SG", "Mexico": "MX", "Japan": "JP", "China": "CN", "Brazil": "BR", "India": "IN", "South Korea": "KR", "South Africa": "ZA", "Argentina": "AR", "Poland": "PL", "Portugal": "PT", "Russia": "RU", "Greece": "GR", "Czech Republic": "CZ", "Finland": "FI", "Hungary": "HU", "Romania": "RO", "Thailand": "TH", "Turkey": "TR", "Ukraine": "UA", "Colombia": "CO", "Chile": "CL", "Peru": "PE", "Malaysia": "MY", "Vietnam": "VN", "Indonesia": "ID", "Philippines": "PH", "United Arab Emirates": "AE", "Saudi Arabia": "SA", "Israel": "IL", "Egypt": "EG", "Nigeria": "NG", "Kenya": "KE", # Common variants and abbreviations "USA": "US", "U.S.A.": "US", "U.S.": "US", "UK": "GB", "U.K.": "GB", "Great Britain": "GB", "England": "GB", "Republic Of Korea": "KR", "Korea": "KR", "Republic Of China": "CN", "Republic Of India": "IN", "UAE": "AE", "Russia": "RU", "Russian Federation": "RU", "The Netherlands": "NL", "Holland": "NL", "Republic Of Ireland": "IE", "Czech": "CZ", "Czechia": "CZ", } # Create a lowercase version of the dictionary for case-insensitive lookups self.country_to_alpha2_lower = {k.lower(): v for k, v in self.country_to_alpha2.items()} # Initialize model variables (to be loaded later) self.tokenizer = None # Longformer tokenizer for descriptions self.model = None # Longformer model for descriptions self.RiskandBlurb_tokenizer = None # MiniLM tokenizer for blurb and risk self.RiskandBlurb_model = None # MiniLM model for blurb and risk self.glove = None # GloVe word vectors for categories and countries self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Load models at initialization if not using lazy loading if not lazy_load: self._load_models() def _load_models(self): """ Load all NLP models required for processing campaign data. This method loads: - Longformer for description embeddings - MiniLM for blurb and risk embeddings - GloVe for category and country embeddings Models are cached to avoid reloading and moved to the appropriate device. """ print("Loading NLP models...") # Cache models locally to avoid downloading every time cache_dir = "/tmp/model_cache" os.environ["TRANSFORMERS_CACHE"] = cache_dir os.environ["HF_HOME"] = cache_dir try: os.makedirs(cache_dir, exist_ok=True) print(f"Created cache directory at {cache_dir}") except Exception as e: print(f"Error creating cache directory: {str(e)}") # Initialize Longformer model and tokenizer (for processing description) model_name = "allenai/longformer-base-4096" print(f"Loading {model_name}...") try: # Add internet connectivity check try: import requests print("Testing internet connectivity...") response = requests.get("https://huggingface.co", timeout=5) if response.status_code == 200: print("Successfully connected to huggingface.co") else: print(f"Error connecting to huggingface.co: {response.status_code}") except Exception as e: print(f"Network connectivity test failed: {str(e)}") # Check if directory exists and is writable if os.path.exists(cache_dir): print(f"Cache directory {cache_dir} exists") if os.access(cache_dir, os.W_OK): print(f"Cache directory {cache_dir} is writable") else: print(f"Cache directory {cache_dir} is not writable") else: print(f"Cache directory {cache_dir} does not exist") # Try loading with explicit cache_dir parameter from transformers import AutoTokenizer print(f"Initializing tokenizer from {model_name} with cache_dir={cache_dir}") self.tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=cache_dir) print(f"Tokenizer loaded successfully") # Load model with explicit cache_dir parameter from transformers import AutoModel print(f"Initializing model from {model_name} with cache_dir={cache_dir}") self.model = AutoModel.from_pretrained(model_name, cache_dir=cache_dir) print(f"Model loaded successfully") except Exception as e: print(f"Error loading Longformer model: {str(e)}") # Continue with a fallback approach or raise the exception raise e try: # Initialize minilm model and tokenizer (for processing risk and blurb) RiskandBlurb_model_name = "sentence-transformers/all-minilm-l6-v2" print(f"Loading {RiskandBlurb_model_name}...") self.RiskandBlurb_tokenizer = AutoTokenizer.from_pretrained(RiskandBlurb_model_name, cache_dir=cache_dir) self.RiskandBlurb_model = AutoModel.from_pretrained(RiskandBlurb_model_name, cache_dir=cache_dir) print(f"RiskandBlurb model loaded successfully") except Exception as e: print(f"Error loading minilm model: {str(e)}") raise e try: # Load GloVe model for country and subcategory embeddings print("Loading GloVe model...") # GENSIM_DATA_DIR is already set at the top of the file print(f"Using GENSIM_DATA_DIR: {os.environ.get('GENSIM_DATA_DIR', 'Not set')}") self.glove = gensim.downloader.load('glove-wiki-gigaword-100') print("GloVe model loaded successfully") except Exception as e: print(f"Error loading GloVe model: {str(e)}") raise e try: # Move models to device self.model = self.model.to(self.device) self.RiskandBlurb_model = self.RiskandBlurb_model.to(self.device) print("All models loaded successfully.") except Exception as e: print(f"Error moving models to device: {str(e)}") raise e def _ensure_models_loaded(self): """ Ensure all required models are loaded. This is called before any processing to make sure models are ready, particularly important when using lazy loading. """ if self.model is None or self.tokenizer is None or self.RiskandBlurb_model is None or self.RiskandBlurb_tokenizer is None or self.glove is None: self._load_models() def _process_text_embedding(self, text: str, max_length: int, tokenizer: AutoTokenizer, model: AutoModel) -> np.ndarray: """ Generate embedding for text using the specified model and tokenizer. This method handles tokenization, model inference, and pooling to create a single vector representation of the input text. Args: text (str): Text to embed max_length (int): Maximum token length for the model tokenizer (AutoTokenizer): Tokenizer to use model (AutoModel): Model to use for embedding generation Returns: np.ndarray: Embedding vector for the text """ # Clean up memory before processing if self.device.type == 'cuda': torch.cuda.empty_cache() gc.collect() # Tokenize the text inputs = tokenizer(text, padding=True, truncation=True, max_length=max_length, return_tensors="pt") # Move inputs to device inputs = {k: v.to(self.device) for k, v in inputs.items()} # Generate embeddings with torch.no_grad(): outputs = model(**inputs) # Mean pooling - take average of all token embeddings attention_mask = inputs['attention_mask'] token_embeddings = outputs.last_hidden_state input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() sentence_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) # Convert to numpy array embedding = sentence_embeddings.cpu().numpy() # Clean up to prevent memory leaks del inputs, outputs, token_embeddings, sentence_embeddings if self.device.type == 'cuda': torch.cuda.empty_cache() gc.collect() return embedding[0] def _get_glove_embedding(self, text: str, dim: int = 100) -> np.ndarray: """ Generate GloVe embedding for a text by averaging word vectors. Args: text (str): Text to embed dim (int): Dimension of the GloVe embeddings Returns: np.ndarray: Averaged GloVe embedding for the text """ if not text: return np.zeros(dim) # Normalize and split text text = text.lower().strip() words = text.split() vectors = [] # Collect vectors for words that exist in the vocabulary for word in words: if word in self.glove: vectors.append(self.glove[word]) # Average vectors if any exist, otherwise return zeros if vectors: return np.mean(vectors, axis=0) else: return np.zeros(dim) def process_description_embedding(self, campaign: Dict, idx: int) -> Tuple[np.ndarray, int]: """ Process the project description to generate a Longformer embedding. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: Tuple containing: - np.ndarray: Longformer embedding of the description - int: Word count of the description """ self._ensure_models_loaded() try: text = campaign.get("raw_description", '') description_length = len(text.split()) embedding = self._process_text_embedding(text, 4096, self.tokenizer, self.model) return embedding, description_length except Exception as e: print(f"Error processing description: {str(e)}") return np.zeros(768), 0 def process_riskandchallenges_embedding(self, campaign: Dict, idx: int) -> np.ndarray: """ Process the risks and challenges section to generate a MiniLM embedding. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: np.ndarray: MiniLM embedding of the risks section """ self._ensure_models_loaded() try: text = campaign.get("raw_risks", '') return self._process_text_embedding(text, 512, self.RiskandBlurb_tokenizer, self.RiskandBlurb_model) except Exception as e: print(f"Error processing risk statement: {str(e)}") return np.zeros(384) def process_blurb(self, campaign: Dict, idx: int) -> np.ndarray: """ Process the project blurb to generate a MiniLM embedding. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: np.ndarray: MiniLM embedding of the blurb """ self._ensure_models_loaded() try: text = campaign.get("raw_blurb", '') return self._process_text_embedding(text, 512, self.RiskandBlurb_tokenizer, self.RiskandBlurb_model) except Exception as e: print(f"Error processing blurb: {str(e)}") return np.zeros(384) def process_category(self, campaign: Dict) -> List[int]: """ Process the project category into a one-hot encoding. Args: campaign (Dict): Campaign data Returns: List[int]: One-hot encoding of the category """ try: # All categories in the dataset fixed_categories = [ "Art", "Comics", "Crafts", "Dance", "Design", "Fashion", "Film & Video", "Food", "Games", "Journalism", "Music", "Photography", "Publishing", "Technology", "Theater" ] category = campaign.get('raw_category', '') # Create one-hot encoding encoding = [1 if cat == category else 0 for cat in fixed_categories] return encoding except Exception as e: print(f"Error processing category: {str(e)}") return [0] * 15 def process_subcategory_embedding(self, campaign: Dict, idx: int) -> np.ndarray: """ Process the project subcategory to generate a GloVe embedding. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: np.ndarray: GloVe embedding of the subcategory """ self._ensure_models_loaded() try: subcategory = campaign.get('raw_subcategory', '') return self._get_glove_embedding(subcategory) except Exception as e: print(f"Error processing subcategory: {str(e)}") return np.zeros(100) def _convert_country_to_alpha2(self, country_name: str) -> str: """ Convert a country name to its ISO alpha-2 code. This helper method handles the conversion with proper logging: 1. Tries exact match first 2. Falls back to case-insensitive match 3. Returns original string if no match found Args: country_name (str): Country name to convert Returns: str: ISO alpha-2 code (e.g., "US") or original country name if no match """ if not country_name: return "" # Try exact match first alpha2_code = self.country_to_alpha2.get(country_name) # If no exact match, try case-insensitive match if not alpha2_code: alpha2_code = self.country_to_alpha2_lower.get(country_name.lower()) # Log results if alpha2_code: print(f"Country conversion: '{country_name}' → '{alpha2_code}'") return alpha2_code else: print(f"Country conversion failed: '{country_name}' not found in dictionary") return country_name def process_country_embedding(self, campaign: Dict, idx: int) -> np.ndarray: """ Process the project country to generate a GloVe embedding. This method: 1. Extracts the country name from campaign data 2. Converts full country name to ISO alpha-2 code (e.g., "United States" → "US") 3. Generates an embedding using GloVe for the standardized country code Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: np.ndarray: GloVe embedding of the country (as alpha-2 code) """ self._ensure_models_loaded() try: # Extract country name from campaign data country_name = campaign.get('raw_country', '') # Convert to alpha-2 code using helper method alpha2_code = self._convert_country_to_alpha2(country_name) # Generate embedding using standardized country code return self._get_glove_embedding(alpha2_code) except Exception as e: print(f"Error processing country for campaign {idx}: {str(e)}") return np.zeros(100) def process_funding_goal(self, campaign: Dict, idx: int) -> float: """ Process campaign funding goal with logarithmic compression. Applies Log1p transformation with base 10 to compress extreme values while preserving relative differences between funding goals. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: float: The transformed funding goal """ try: goal = float(campaign.get('funding_goal', 0)) # Log1p transformation, good for general compression while preserving relative differences transformed_goal = np.log1p(goal)/np.log(10) return transformed_goal except Exception as e: print(f"Error processing funding goal for campaign {idx}: {str(e)}") return 0.0 def process_previous_funding_goal(self, campaign: Dict, idx: int) -> float: """ Process previous campaign funding goal with logarithmic compression. Applies Log1p transformation with base 10 to compress extreme values while preserving relative differences between previous funding goals. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: float: The transformed previous funding goal """ try: # Get the raw value previous_goal = float(campaign.get('previous_funding_goal', 0)) # Log the source value print(f"Previous funding goal raw value: {previous_goal}") # Apply logarithmic transformation if value is positive if previous_goal > 0: transformed_goal = np.log1p(previous_goal)/np.log(10) print(f"Applied log transformation to previous_funding_goal: {previous_goal} → {transformed_goal}") return transformed_goal else: print(f"No transformation applied to previous_funding_goal: value is {previous_goal}") return 0.0 except Exception as e: print(f"Error processing previous funding goal for campaign {idx}: {str(e)}") return 0.0 def process_previous_pledged(self, campaign: Dict, idx: int) -> float: """ Process previous campaign pledged amount with logarithmic compression. Applies Log1p transformation with base 10 to compress extreme values while preserving relative differences between previous pledged amounts. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: float: The transformed previous pledged amount """ try: # Get the raw value pledged = float(campaign.get('previous_pledged', 0)) # Log the source value print(f"Previous pledged raw value: {pledged}") # Apply logarithmic transformation if value is positive if pledged > 0: transformed_pledge = np.log1p(pledged)/np.log(10) print(f"Applied log transformation to previous_pledged: {pledged} → {transformed_pledge}") return transformed_pledge else: print(f"No transformation applied to previous_pledged: value is {pledged}") return 0.0 except Exception as e: print(f"Error processing pledge amount for campaign {idx}: {str(e)}") return 0.0 def calculate_previous_sucess_rate(self, campaign: Dict, idx: int) -> float: """ Calculate success rate of creator's previous campaigns. Computes the ratio of successful previous projects to total previous projects. Can use either direct 'previous_success_rate' or calculate from 'previous_successful_projects' and 'previous_projects_count'. Args: campaign (Dict): Campaign data idx (int): Index of the campaign Returns: float: The previous success rate (0-1) """ try: # First check if success rate is provided directly if 'previous_success_rate' in campaign: # Log the direct usage for debugging rate = float(campaign.get('previous_success_rate', 0)) print(f"Using provided previous_success_rate directly: {rate}") return rate # Otherwise calculate from successful projects and total projects previousProjects = float(campaign.get('previous_projects_count', 0)) previousSuccessfulProjects = float(campaign.get('previous_successful_projects', 0)) # Log the values used for calculation (for debugging) print(f"Calculating success rate from: projects={previousProjects}, successful={previousSuccessfulProjects}") if previousProjects == 0.0: return 0.0 else: previous_success_rate = previousSuccessfulProjects / previousProjects return previous_success_rate except Exception as e: print(f"Error calculating previous success rate for campaign {idx}: {str(e)}") return 0.0 def process_campaign(self, campaign: Dict, idx: int) -> Dict: """ Process a single campaign to prepare all required features for prediction. This is the main method that processes a raw campaign and prepares all features (embeddings and numerical) for the prediction model. Processing steps include: - Text embedding generation using appropriate models - Category and country embedding through GloVe - Logarithmic transformation of monetary values - Normalization of numerical features Args: campaign (Dict): Raw campaign data idx (int): Index of the campaign Returns: Dict: Processed data with all features ready for prediction """ self._ensure_models_loaded() # Log the incoming campaign data for debugging print(f"Processing campaign {idx} with keys: {list(campaign.keys())}") # Generate embeddings for text fields description_embedding, calculated_description_length = self.process_description_embedding(campaign, idx) # Use existing value for description_length if present, otherwise use calculated description_length = campaign.get('description_length', calculated_description_length) # Create processed data dictionary with embeddings and numerical features result = { 'description_embedding': description_embedding.tolist(), 'description_length': description_length, 'blurb_embedding': self.process_blurb(campaign, idx).tolist(), 'risk_embedding': self.process_riskandchallenges_embedding(campaign, idx).tolist(), 'category_embedding': self.process_category(campaign), 'subcategory_embedding': self.process_subcategory_embedding(campaign, idx).tolist(), 'country_embedding': self.process_country_embedding(campaign, idx).tolist() } # Process financial features with logarithmic transformation result['funding_goal'] = self.process_funding_goal(campaign, idx) result['previous_funding_goal'] = self.process_previous_funding_goal(campaign, idx) result['previous_pledged'] = self.process_previous_pledged(campaign, idx) # Calculate success rate based on previous projects # Ensure both direct values and calculated values are handled result['previous_success_rate'] = self.calculate_previous_sucess_rate(campaign, idx) # Extract simple integer features, with specific handling for previous_projects_count for field in ['image_count', 'video_count', 'campaign_duration']: result[field] = int(campaign.get(field, 0)) # Special handling for previous_projects_count to ensure consistency if 'previous_projects_count' in campaign: # Use the value directly from input result['previous_projects_count'] = int(campaign.get('previous_projects_count', 0)) print(f"Using provided previous_projects_count: {result['previous_projects_count']}") else: # Default to 0 if not provided result['previous_projects_count'] = 0 # Log the final result for debugging print(f"Processed campaign with previous metrics: " + f"count={result.get('previous_projects_count')}, " + f"rate={result.get('previous_success_rate')}, " + f"pledged={result.get('previous_pledged')}, " + f"goal={result.get('previous_funding_goal')}") return result