Kickstarter-prediction-embedding / src /ProcessOneSingleCampaign.py
angusfung's picture
Update src/ProcessOneSingleCampaign.py
01783ce verified
raw
history blame contribute delete
28.7 kB
"""
Campaign Data Processor for Kickstarter Prediction
This module handles the preprocessing of raw Kickstarter campaign data,
generating text embeddings and preparing numerical features for prediction.
Key functionality:
- Longformer embeddings for project descriptions
- Sentence transformer embeddings for blurbs and risk statements
- GloVe embeddings for categories and countries
- Normalization of numerical features
Author: Angus Fung
Date: April 2025
"""
import os
# Set gensim data directory to a writable location at the very start
os.environ['GENSIM_DATA_DIR'] = '/tmp/gensim-data'
try:
os.makedirs('/tmp/gensim-data', exist_ok=True)
print(f"Created directory at {os.environ['GENSIM_DATA_DIR']}")
except Exception as e:
print(f"Error creating gensim directory: {str(e)}")
import json
import numpy as np
from typing import Dict, List, Tuple, Any, Optional
import torch
from transformers import AutoTokenizer, AutoModel
import gc
import gensim.downloader
class CampaignProcessor:
"""
Processor for Kickstarter campaign data.
This class handles the preprocessing of raw campaign data, transforming
text and categorical features into embeddings using various NLP models
and preparing numerical features for the prediction model.
"""
def __init__(self, data: List[Dict], lazy_load: bool = False):
"""
Initialize the CampaignProcessor.
Args:
data (List[Dict]): List of campaign dictionaries to process
lazy_load (bool): If True, models will be loaded only when needed
rather than at initialization time
"""
self.data = data
self.categories = sorted(list(set(camp.get('raw_category', '') for camp in self.data)))
self.lazy_load = lazy_load
# Country name to ISO alpha-2 code mapping
# First letter of each word is capitalized in the original data
self.country_to_alpha2 = {
# Main ISO country names
"United States": "US",
"United Kingdom": "GB",
"Canada": "CA",
"Australia": "AU",
"New Zealand": "NZ",
"Germany": "DE",
"France": "FR",
"Italy": "IT",
"Spain": "ES",
"Netherlands": "NL",
"Sweden": "SE",
"Denmark": "DK",
"Norway": "NO",
"Ireland": "IE",
"Switzerland": "CH",
"Austria": "AT",
"Belgium": "BE",
"Luxembourg": "LU",
"Hong Kong": "HK",
"Singapore": "SG",
"Mexico": "MX",
"Japan": "JP",
"China": "CN",
"Brazil": "BR",
"India": "IN",
"South Korea": "KR",
"South Africa": "ZA",
"Argentina": "AR",
"Poland": "PL",
"Portugal": "PT",
"Russia": "RU",
"Greece": "GR",
"Czech Republic": "CZ",
"Finland": "FI",
"Hungary": "HU",
"Romania": "RO",
"Thailand": "TH",
"Turkey": "TR",
"Ukraine": "UA",
"Colombia": "CO",
"Chile": "CL",
"Peru": "PE",
"Malaysia": "MY",
"Vietnam": "VN",
"Indonesia": "ID",
"Philippines": "PH",
"United Arab Emirates": "AE",
"Saudi Arabia": "SA",
"Israel": "IL",
"Egypt": "EG",
"Nigeria": "NG",
"Kenya": "KE",
# Common variants and abbreviations
"USA": "US",
"U.S.A.": "US",
"U.S.": "US",
"UK": "GB",
"U.K.": "GB",
"Great Britain": "GB",
"England": "GB",
"Republic Of Korea": "KR",
"Korea": "KR",
"Republic Of China": "CN",
"Republic Of India": "IN",
"UAE": "AE",
"Russia": "RU",
"Russian Federation": "RU",
"The Netherlands": "NL",
"Holland": "NL",
"Republic Of Ireland": "IE",
"Czech": "CZ",
"Czechia": "CZ",
}
# Create a lowercase version of the dictionary for case-insensitive lookups
self.country_to_alpha2_lower = {k.lower(): v for k, v in self.country_to_alpha2.items()}
# Initialize model variables (to be loaded later)
self.tokenizer = None # Longformer tokenizer for descriptions
self.model = None # Longformer model for descriptions
self.RiskandBlurb_tokenizer = None # MiniLM tokenizer for blurb and risk
self.RiskandBlurb_model = None # MiniLM model for blurb and risk
self.glove = None # GloVe word vectors for categories and countries
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Load models at initialization if not using lazy loading
if not lazy_load:
self._load_models()
def _load_models(self):
"""
Load all NLP models required for processing campaign data.
This method loads:
- Longformer for description embeddings
- MiniLM for blurb and risk embeddings
- GloVe for category and country embeddings
Models are cached to avoid reloading and moved to the appropriate device.
"""
print("Loading NLP models...")
# Cache models locally to avoid downloading every time
cache_dir = "/tmp/model_cache"
os.environ["TRANSFORMERS_CACHE"] = cache_dir
os.environ["HF_HOME"] = cache_dir
try:
os.makedirs(cache_dir, exist_ok=True)
print(f"Created cache directory at {cache_dir}")
except Exception as e:
print(f"Error creating cache directory: {str(e)}")
# Initialize Longformer model and tokenizer (for processing description)
model_name = "allenai/longformer-base-4096"
print(f"Loading {model_name}...")
try:
# Add internet connectivity check
try:
import requests
print("Testing internet connectivity...")
response = requests.get("https://huggingface.co", timeout=5)
if response.status_code == 200:
print("Successfully connected to huggingface.co")
else:
print(f"Error connecting to huggingface.co: {response.status_code}")
except Exception as e:
print(f"Network connectivity test failed: {str(e)}")
# Check if directory exists and is writable
if os.path.exists(cache_dir):
print(f"Cache directory {cache_dir} exists")
if os.access(cache_dir, os.W_OK):
print(f"Cache directory {cache_dir} is writable")
else:
print(f"Cache directory {cache_dir} is not writable")
else:
print(f"Cache directory {cache_dir} does not exist")
# Try loading with explicit cache_dir parameter
from transformers import AutoTokenizer
print(f"Initializing tokenizer from {model_name} with cache_dir={cache_dir}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=cache_dir)
print(f"Tokenizer loaded successfully")
# Load model with explicit cache_dir parameter
from transformers import AutoModel
print(f"Initializing model from {model_name} with cache_dir={cache_dir}")
self.model = AutoModel.from_pretrained(model_name, cache_dir=cache_dir)
print(f"Model loaded successfully")
except Exception as e:
print(f"Error loading Longformer model: {str(e)}")
# Continue with a fallback approach or raise the exception
raise e
try:
# Initialize minilm model and tokenizer (for processing risk and blurb)
RiskandBlurb_model_name = "sentence-transformers/all-minilm-l6-v2"
print(f"Loading {RiskandBlurb_model_name}...")
self.RiskandBlurb_tokenizer = AutoTokenizer.from_pretrained(RiskandBlurb_model_name, cache_dir=cache_dir)
self.RiskandBlurb_model = AutoModel.from_pretrained(RiskandBlurb_model_name, cache_dir=cache_dir)
print(f"RiskandBlurb model loaded successfully")
except Exception as e:
print(f"Error loading minilm model: {str(e)}")
raise e
try:
# Load GloVe model for country and subcategory embeddings
print("Loading GloVe model...")
# GENSIM_DATA_DIR is already set at the top of the file
print(f"Using GENSIM_DATA_DIR: {os.environ.get('GENSIM_DATA_DIR', 'Not set')}")
self.glove = gensim.downloader.load('glove-wiki-gigaword-100')
print("GloVe model loaded successfully")
except Exception as e:
print(f"Error loading GloVe model: {str(e)}")
raise e
try:
# Move models to device
self.model = self.model.to(self.device)
self.RiskandBlurb_model = self.RiskandBlurb_model.to(self.device)
print("All models loaded successfully.")
except Exception as e:
print(f"Error moving models to device: {str(e)}")
raise e
def _ensure_models_loaded(self):
"""
Ensure all required models are loaded.
This is called before any processing to make sure models are ready,
particularly important when using lazy loading.
"""
if self.model is None or self.tokenizer is None or self.RiskandBlurb_model is None or self.RiskandBlurb_tokenizer is None or self.glove is None:
self._load_models()
def _process_text_embedding(self, text: str, max_length: int, tokenizer: AutoTokenizer, model: AutoModel) -> np.ndarray:
"""
Generate embedding for text using the specified model and tokenizer.
This method handles tokenization, model inference, and pooling to
create a single vector representation of the input text.
Args:
text (str): Text to embed
max_length (int): Maximum token length for the model
tokenizer (AutoTokenizer): Tokenizer to use
model (AutoModel): Model to use for embedding generation
Returns:
np.ndarray: Embedding vector for the text
"""
# Clean up memory before processing
if self.device.type == 'cuda':
torch.cuda.empty_cache()
gc.collect()
# Tokenize the text
inputs = tokenizer(text,
padding=True,
truncation=True,
max_length=max_length,
return_tensors="pt")
# Move inputs to device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Generate embeddings
with torch.no_grad():
outputs = model(**inputs)
# Mean pooling - take average of all token embeddings
attention_mask = inputs['attention_mask']
token_embeddings = outputs.last_hidden_state
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
sentence_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
# Convert to numpy array
embedding = sentence_embeddings.cpu().numpy()
# Clean up to prevent memory leaks
del inputs, outputs, token_embeddings, sentence_embeddings
if self.device.type == 'cuda':
torch.cuda.empty_cache()
gc.collect()
return embedding[0]
def _get_glove_embedding(self, text: str, dim: int = 100) -> np.ndarray:
"""
Generate GloVe embedding for a text by averaging word vectors.
Args:
text (str): Text to embed
dim (int): Dimension of the GloVe embeddings
Returns:
np.ndarray: Averaged GloVe embedding for the text
"""
if not text:
return np.zeros(dim)
# Normalize and split text
text = text.lower().strip()
words = text.split()
vectors = []
# Collect vectors for words that exist in the vocabulary
for word in words:
if word in self.glove:
vectors.append(self.glove[word])
# Average vectors if any exist, otherwise return zeros
if vectors:
return np.mean(vectors, axis=0)
else:
return np.zeros(dim)
def process_description_embedding(self, campaign: Dict, idx: int) -> Tuple[np.ndarray, int]:
"""
Process the project description to generate a Longformer embedding.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
Tuple containing:
- np.ndarray: Longformer embedding of the description
- int: Word count of the description
"""
self._ensure_models_loaded()
try:
text = campaign.get("raw_description", '')
description_length = len(text.split())
embedding = self._process_text_embedding(text, 4096, self.tokenizer, self.model)
return embedding, description_length
except Exception as e:
print(f"Error processing description: {str(e)}")
return np.zeros(768), 0
def process_riskandchallenges_embedding(self, campaign: Dict, idx: int) -> np.ndarray:
"""
Process the risks and challenges section to generate a MiniLM embedding.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
np.ndarray: MiniLM embedding of the risks section
"""
self._ensure_models_loaded()
try:
text = campaign.get("raw_risks", '')
return self._process_text_embedding(text, 512, self.RiskandBlurb_tokenizer, self.RiskandBlurb_model)
except Exception as e:
print(f"Error processing risk statement: {str(e)}")
return np.zeros(384)
def process_blurb(self, campaign: Dict, idx: int) -> np.ndarray:
"""
Process the project blurb to generate a MiniLM embedding.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
np.ndarray: MiniLM embedding of the blurb
"""
self._ensure_models_loaded()
try:
text = campaign.get("raw_blurb", '')
return self._process_text_embedding(text, 512, self.RiskandBlurb_tokenizer, self.RiskandBlurb_model)
except Exception as e:
print(f"Error processing blurb: {str(e)}")
return np.zeros(384)
def process_category(self, campaign: Dict) -> List[int]:
"""
Process the project category into a one-hot encoding.
Args:
campaign (Dict): Campaign data
Returns:
List[int]: One-hot encoding of the category
"""
try:
# All categories in the dataset
fixed_categories = [
"Art", "Comics", "Crafts", "Dance", "Design", "Fashion",
"Film & Video", "Food", "Games", "Journalism", "Music",
"Photography", "Publishing", "Technology", "Theater"
]
category = campaign.get('raw_category', '')
# Create one-hot encoding
encoding = [1 if cat == category else 0 for cat in fixed_categories]
return encoding
except Exception as e:
print(f"Error processing category: {str(e)}")
return [0] * 15
def process_subcategory_embedding(self, campaign: Dict, idx: int) -> np.ndarray:
"""
Process the project subcategory to generate a GloVe embedding.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
np.ndarray: GloVe embedding of the subcategory
"""
self._ensure_models_loaded()
try:
subcategory = campaign.get('raw_subcategory', '')
return self._get_glove_embedding(subcategory)
except Exception as e:
print(f"Error processing subcategory: {str(e)}")
return np.zeros(100)
def _convert_country_to_alpha2(self, country_name: str) -> str:
"""
Convert a country name to its ISO alpha-2 code.
This helper method handles the conversion with proper logging:
1. Tries exact match first
2. Falls back to case-insensitive match
3. Returns original string if no match found
Args:
country_name (str): Country name to convert
Returns:
str: ISO alpha-2 code (e.g., "US") or original country name if no match
"""
if not country_name:
return ""
# Try exact match first
alpha2_code = self.country_to_alpha2.get(country_name)
# If no exact match, try case-insensitive match
if not alpha2_code:
alpha2_code = self.country_to_alpha2_lower.get(country_name.lower())
# Log results
if alpha2_code:
print(f"Country conversion: '{country_name}' → '{alpha2_code}'")
return alpha2_code
else:
print(f"Country conversion failed: '{country_name}' not found in dictionary")
return country_name
def process_country_embedding(self, campaign: Dict, idx: int) -> np.ndarray:
"""
Process the project country to generate a GloVe embedding.
This method:
1. Extracts the country name from campaign data
2. Converts full country name to ISO alpha-2 code (e.g., "United States" → "US")
3. Generates an embedding using GloVe for the standardized country code
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
np.ndarray: GloVe embedding of the country (as alpha-2 code)
"""
self._ensure_models_loaded()
try:
# Extract country name from campaign data
country_name = campaign.get('raw_country', '')
# Convert to alpha-2 code using helper method
alpha2_code = self._convert_country_to_alpha2(country_name)
# Generate embedding using standardized country code
return self._get_glove_embedding(alpha2_code)
except Exception as e:
print(f"Error processing country for campaign {idx}: {str(e)}")
return np.zeros(100)
def process_funding_goal(self, campaign: Dict, idx: int) -> float:
"""
Process campaign funding goal with logarithmic compression.
Applies Log1p transformation with base 10 to compress extreme values while
preserving relative differences between funding goals.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
float: The transformed funding goal
"""
try:
goal = float(campaign.get('funding_goal', 0))
# Log1p transformation, good for general compression while preserving relative differences
transformed_goal = np.log1p(goal)/np.log(10)
return transformed_goal
except Exception as e:
print(f"Error processing funding goal for campaign {idx}: {str(e)}")
return 0.0
def process_previous_funding_goal(self, campaign: Dict, idx: int) -> float:
"""
Process previous campaign funding goal with logarithmic compression.
Applies Log1p transformation with base 10 to compress extreme values while
preserving relative differences between previous funding goals.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
float: The transformed previous funding goal
"""
try:
# Get the raw value
previous_goal = float(campaign.get('previous_funding_goal', 0))
# Log the source value
print(f"Previous funding goal raw value: {previous_goal}")
# Apply logarithmic transformation if value is positive
if previous_goal > 0:
transformed_goal = np.log1p(previous_goal)/np.log(10)
print(f"Applied log transformation to previous_funding_goal: {previous_goal}{transformed_goal}")
return transformed_goal
else:
print(f"No transformation applied to previous_funding_goal: value is {previous_goal}")
return 0.0
except Exception as e:
print(f"Error processing previous funding goal for campaign {idx}: {str(e)}")
return 0.0
def process_previous_pledged(self, campaign: Dict, idx: int) -> float:
"""
Process previous campaign pledged amount with logarithmic compression.
Applies Log1p transformation with base 10 to compress extreme values while
preserving relative differences between previous pledged amounts.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
float: The transformed previous pledged amount
"""
try:
# Get the raw value
pledged = float(campaign.get('previous_pledged', 0))
# Log the source value
print(f"Previous pledged raw value: {pledged}")
# Apply logarithmic transformation if value is positive
if pledged > 0:
transformed_pledge = np.log1p(pledged)/np.log(10)
print(f"Applied log transformation to previous_pledged: {pledged}{transformed_pledge}")
return transformed_pledge
else:
print(f"No transformation applied to previous_pledged: value is {pledged}")
return 0.0
except Exception as e:
print(f"Error processing pledge amount for campaign {idx}: {str(e)}")
return 0.0
def calculate_previous_sucess_rate(self, campaign: Dict, idx: int) -> float:
"""
Calculate success rate of creator's previous campaigns.
Computes the ratio of successful previous projects to total previous projects.
Can use either direct 'previous_success_rate' or calculate from
'previous_successful_projects' and 'previous_projects_count'.
Args:
campaign (Dict): Campaign data
idx (int): Index of the campaign
Returns:
float: The previous success rate (0-1)
"""
try:
# First check if success rate is provided directly
if 'previous_success_rate' in campaign:
# Log the direct usage for debugging
rate = float(campaign.get('previous_success_rate', 0))
print(f"Using provided previous_success_rate directly: {rate}")
return rate
# Otherwise calculate from successful projects and total projects
previousProjects = float(campaign.get('previous_projects_count', 0))
previousSuccessfulProjects = float(campaign.get('previous_successful_projects', 0))
# Log the values used for calculation (for debugging)
print(f"Calculating success rate from: projects={previousProjects}, successful={previousSuccessfulProjects}")
if previousProjects == 0.0:
return 0.0
else:
previous_success_rate = previousSuccessfulProjects / previousProjects
return previous_success_rate
except Exception as e:
print(f"Error calculating previous success rate for campaign {idx}: {str(e)}")
return 0.0
def process_campaign(self, campaign: Dict, idx: int) -> Dict:
"""
Process a single campaign to prepare all required features for prediction.
This is the main method that processes a raw campaign and prepares
all features (embeddings and numerical) for the prediction model.
Processing steps include:
- Text embedding generation using appropriate models
- Category and country embedding through GloVe
- Logarithmic transformation of monetary values
- Normalization of numerical features
Args:
campaign (Dict): Raw campaign data
idx (int): Index of the campaign
Returns:
Dict: Processed data with all features ready for prediction
"""
self._ensure_models_loaded()
# Log the incoming campaign data for debugging
print(f"Processing campaign {idx} with keys: {list(campaign.keys())}")
# Generate embeddings for text fields
description_embedding, calculated_description_length = self.process_description_embedding(campaign, idx)
# Use existing value for description_length if present, otherwise use calculated
description_length = campaign.get('description_length', calculated_description_length)
# Create processed data dictionary with embeddings and numerical features
result = {
'description_embedding': description_embedding.tolist(),
'description_length': description_length,
'blurb_embedding': self.process_blurb(campaign, idx).tolist(),
'risk_embedding': self.process_riskandchallenges_embedding(campaign, idx).tolist(),
'category_embedding': self.process_category(campaign),
'subcategory_embedding': self.process_subcategory_embedding(campaign, idx).tolist(),
'country_embedding': self.process_country_embedding(campaign, idx).tolist()
}
# Process financial features with logarithmic transformation
result['funding_goal'] = self.process_funding_goal(campaign, idx)
result['previous_funding_goal'] = self.process_previous_funding_goal(campaign, idx)
result['previous_pledged'] = self.process_previous_pledged(campaign, idx)
# Calculate success rate based on previous projects
# Ensure both direct values and calculated values are handled
result['previous_success_rate'] = self.calculate_previous_sucess_rate(campaign, idx)
# Extract simple integer features, with specific handling for previous_projects_count
for field in ['image_count', 'video_count', 'campaign_duration']:
result[field] = int(campaign.get(field, 0))
# Special handling for previous_projects_count to ensure consistency
if 'previous_projects_count' in campaign:
# Use the value directly from input
result['previous_projects_count'] = int(campaign.get('previous_projects_count', 0))
print(f"Using provided previous_projects_count: {result['previous_projects_count']}")
else:
# Default to 0 if not provided
result['previous_projects_count'] = 0
# Log the final result for debugging
print(f"Processed campaign with previous metrics: " +
f"count={result.get('previous_projects_count')}, " +
f"rate={result.get('previous_success_rate')}, " +
f"pledged={result.get('previous_pledged')}, " +
f"goal={result.get('previous_funding_goal')}")
return result