DeepFundingOracle / Oracle /deepfundingoracle.py
FelixPhilip's picture
Oracle
722cfc4
"""
DeepFunding Oracle:
This script dynamically loads dependency data and for each repository URL:
• Fetches GitHub features (stars, forks, watchers, open issues, pull requests, activity) using the GitHub API.
• Uses the LLama model to analyze parent-child behavior (based on the fetched features and parent info)
and returns a base weight (0-1) for the repository.
• Trains a RandomForest regressor on these features (with the base weight as the target) to predict a final weight.
The output submission CSV has three columns: repo, parent, and final_weight.
"""
import base64
from io import StringIO
import os
import warnings
import csv
import re
import requests
import numpy as np
import pandas as pd
import time
import threading
import logging
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import signal
from sklearn.pipeline import Pipeline
from tqdm import tqdm
import sys
import re
import json
import time
import json
import time
import logging
import sys
import warnings
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd
import requests
from tqdm import tqdm
from scipy.special import log1p, expm1
from sklearn.model_selection import RandomizedSearchCV, GroupKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV,KFold
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.special import log1p, expm1
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_squared_error
from xgboost import XGBRegressor
from scipy.special import log1p, expm1
from Oracle.SmolLM import SmolLM
import os
warnings.filterwarnings("ignore")
# Configure logging to file and console
logging.basicConfig(
handlers=[
logging.FileHandler("deepfundingoracle.log"),
logging.StreamHandler(sys.stdout)
],
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def add_temporal_and_ratio_features(df):
"""
Adds:
- days_since_update: days between last GitHub update and today
- closed_issue_ratio: ratio of closed to total issues
- (Optional) merged_pull_ratio: if you have merged_pulls count
"""
df['activity'] = pd.to_datetime(df['activity'], errors='coerce')
today = pd.to_datetime('today')
df['days_since_update'] = (today - df['activity']).dt.days.fillna((today - df['activity'].median()).days)
# closed_issue_ratio: assuming open_issues includes all and closed = total - open
df['closed_issue_ratio']= 0
total_issues = (df['open_issues']/ (1- 0.5)).replace([np.inf, -np.inf], np.nan)
df['closed_issue_ratio'] = ((total_issues - df['open_issues']).fillna(0)/ total_issues.fillna(1))
df['closed_issue_ratio'] = df['closed_issue_ratio'].clip(0, 1)
# merged_pull_ratio: if you have merged_pulls count
df['merged_pull_ratio'] = df['merged_pulls'].clip(lower=0)/ df['pulls'].clip(lower=1)
return df
##############################
# GitHub API helper: Fetch repository metrics
##############################
def fetch_repo_metrics(repo_url):
"""
RATIONALE (Recommendation 2): Fetches GitHub metrics, handling API pagination to get accurate
contributor and pull request counts instead of the default cap of 30. This provides much
more accurate features for popular repositories.
"""
default_metrics = {"stars": 0, "forks": 0, "watchers": 0, "open_issues": 0, "pulls": 0, "activity": pd.NaT,
"contributors": 0}
try:
m = re.search(r"github\.com/([^/]+)/([^/]+)", repo_url)
if not m:
logging.warning(f"Malformed GitHub URL: {repo_url}")
return default_metrics
owner, repo_name = m.group(1), m.group(2)
api_url = f"https://api.github.com/repos/{owner}/{repo_name}"
headers = {}
token = os.environ.get("GITHUB_API_TOKEN")
if token:
headers["Authorization"] = f"token {token}"
r = requests.get(api_url, headers=headers, timeout=15)
r.raise_for_status()
data = r.json()
def get_count_from_pagination(url, headers):
try:
resp = requests.get(f"{url}?per_page=1", headers=headers, timeout=10)
if resp.status_code == 200 and 'Link' in resp.headers:
match = re.search(r'page=(\d+)>; rel="last"', resp.headers['Link'])
if match:
return int(match.group(1))
return len(resp.json()) if resp.status_code == 200 else 0
except requests.exceptions.RequestException:
return 0
return {
"stars": data.get("stargazers_count", 0),
"forks": data.get("forks_count", 0),
"watchers": data.get("subscribers_count", 0), # subscribers_count is a better 'watch' metric
"open_issues": data.get("open_issues_count", 0),
"activity": pd.to_datetime(data.get("updated_at")),
"contributors": get_count_from_pagination(data['contributors_url'], headers),
"pulls": get_count_from_pagination(data['pulls_url'].replace('{/number}', ''), headers)
}
except requests.exceptions.RequestException as e:
logging.error(f"Failed to fetch data for {repo_url}: {e}")
return default_metrics
def fetch_github_features(df):
"""Concurrently fetches GitHub features for all repositories in the DataFrame."""
logging.info("Fetching GitHub features for repositories...")
metrics_data = []
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(fetch_repo_metrics, url): url for url in df['repo']}
for future in tqdm(concurrent.futures.as_completed(future_to_url), total=len(df), desc="Fetching GitHub Metrics"):
metrics_data.append(future.result())
return pd.concat([df.reset_index(drop=True), pd.DataFrame(metrics_data)], axis=1)
def add_derived_features(df):
"""
RATIONALE (Recommendation 2): Adds derived temporal and interaction features like 'days_since_update'
and 'stars_per_contributor' to give the model more powerful signals to learn from.
"""
logging.info("Engineering derived features...")
df['activity'] = pd.to_datetime(df['activity'], errors='coerce')
df['days_since_update'] = (pd.Timestamp.now(tz='UTC') - df['activity']).dt.days
df['days_since_update'].fillna(df['days_since_update'].median(), inplace=True)
df['stars_per_contributor'] = df['stars'] / df['contributors'].clip(lower=1)
df['forks_per_star'] = df['forks'] / df['stars'].clip(lower=1)
numeric_cols = df.select_dtypes(include=np.number).columns
df[numeric_cols] = df[numeric_cols].fillna(0)
return df
def calculate_fallback_weights(df):
"""
Dynamically calculate fallback feature weights based on feature variance.
"""
print("[INFO] Calculating fallback feature weights...")
numeric_cols = ['stars', 'forks', 'watchers', 'open_issues', 'pulls', 'contributors', 'dependencies_count']
# Filter to only include columns that exist in the DataFrame
valid_cols = [col for col in numeric_cols if col in df.columns]
# Create default weights
default_weights = {
'stars': 0.3,
'forks': 0.2,
'watchers': 0.2,
'open_issues': 0.1,
'pulls': 0.1,
'contributors': 0.05,
'dependencies_count': 0.05
}
# If any data exists, calculate variance-based weights
if len(valid_cols) > 0 and df[valid_cols].sum().sum() > 0:
# Calculate variance for each feature
feature_variances = df[valid_cols].var()
total_variance = feature_variances.sum()
# If meaningful variance exists, use it for weights
if total_variance > 0:
weights = {col: var / total_variance for col, var in feature_variances.items()}
# Normalize to ensure sum is 1.0
sum_weights = sum(weights.values())
if sum_weights > 0:
weights = {k: v / sum_weights for k, v in weights.items()}
return weights
# Return default weights if we couldn't calculate meaningful ones
print("[INFO] Using default fallback weights")
return default_weights
##############################
# Feature Extraction
##############################
def load_data(file):
"""
Dynamically load the dependency data CSV from the uploaded file.
Expects at least "repo" and "parent" columns.
"""
try:
print("[INFO] Loading data from uploaded file...")
start_time = time.time()
# Read the uploaded file directly into a DataFrame
df = pd.read_csv(file)
end_time = time.time()
print(f"[INFO] Data loaded successfully in {end_time - start_time:.2f} seconds.")
return df
except Exception as e:
print("[ERROR] Error loading data:", e)
return None
def timeout_handler(signum, frame):
raise TimeoutError("LLama model prediction timed out.")
def assign_base_weight(df):
"""
Assigns a robust `base_weight` using an LLM with a specific persona and JSON output,
then applies log transformation before normalization.
"""
logging.info("Assigning robust base weights using LLM...")
oracle = SmolLM()
# RATIONALE (Recommendation 1): This prompt is highly specific. It sets a persona (VC), defines
# the goal (assess health), prioritizes metrics, and demands a strict JSON output. This
# leads to a much higher quality and more reliable response from the LLM.
prompt = (
"As an expert venture capitalist specializing in open-source software, your goal is to assess a project's "
"overall health, community engagement, and development velocity. Based on this, assign a numeric importance "
"weight to each of the following GitHub metrics: 'stars', 'forks', 'watchers', 'open_issues', 'pulls', "
"'contributors', and 'days_since_update'. "
"Prioritize metrics indicating active, collaborative development (like contributors, pulls, recent updates) "
"over simple popularity metrics (like stars). The 'days_since_update' metric is inverse; lower is better, so it should have a negative weight. "
"The absolute values of the weights should sum to approximately 1. "
"Provide your answer ONLY in a strict JSON format. Example: "
'{"stars": 0.2, "forks": 0.1, "watchers": 0.05, "pulls": 0.2, "open_issues": 0.1, "contributors": 0.25, "days_since_update": -0.1}'
)
feature_weights = None
try:
response_text = oracle.predict(prompt)
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if not json_match: raise ValueError("No JSON object found in the LLM response.")
feature_weights = json.loads(json_match.group(0))
logging.info(f"Successfully parsed feature weights from LLM: {feature_weights}")
except Exception as e:
logging.error(f"Failed to parse LLM response, using fallback weights. Error: {e}")
feature_weights = {'stars': 0.15, 'forks': 0.1, 'watchers': 0.05, 'pulls': 0.25, 'open_issues': 0.1,
'contributors': 0.25, 'days_since_update': -0.1}
df["base_weight_raw"] = sum(df[feature] * weight for feature, weight in feature_weights.items() if feature in df)
# RATIONALE (Recommendation 1): Log-transforming the raw score before scaling prevents extreme
# outliers from dominating the normalization process, creating a more stable target variable.
df['base_weight_log'] = np.log1p(df['base_weight_raw'] - df['base_weight_raw'].min())
df['base_weight'] = df.groupby("parent")["base_weight_log"].transform(
lambda s: (s - s.min()) / (s.max() - s.min() if s.max() > s.min() else 1)
).fillna(0.5)
return df
def sanity_check_weights(df):
"""
Sanity-checks LLM weights by comparing them with other metrics.
"""
print("[INFO] Performing sanity check on LLM weights...")
df["sanity_check_weight"] = (df["stars"] + df["forks"] + df["watchers"]) / 3
df["ensemble_weight"] = (df["base_weight"] + df["sanity_check_weight"]) / 2
print("[INFO] Sanity check and ensemble weights added.")
return df
def visualize_feature_distributions(df):
"""
Visualizes feature distributions and correlations.
"""
print("[INFO] Visualizing feature distributions and correlations...")
if not plt or not sns:
print("[ERROR] Matplotlib or Seaborn not available for visualization.")
return
numeric_cols = df.select_dtypes(include=[np.number]).columns
# Plot feature distributions
df[numeric_cols].hist(bins=20, figsize=(15, 10), color="skyblue", edgecolor="black")
plt.suptitle("Feature Distributions", fontsize=16)
plt.show()
# Plot feature correlations
correlation_matrix = df[numeric_cols].corr()
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5)
plt.title("Feature Correlation Matrix", fontsize=16)
plt.show()
def normalize_and_clip_weights(df, group_col="parent", weight_col="final_weight"):
"""
Ensures weights are non-negative and sum to 1 per group.
"""
if df is None:
raise ValueError("DataFrame is None, cannot normalize weights.")
if weight_col not in df.columns:
raise KeyError(f"`{weight_col}` column not found in DataFrame.")
# Clip negatives
df[weight_col] = df[weight_col].clip(lower=0)
# Normalize within each group
def normalize_group(x):
total = x.sum()
if total > 0:
return x / total
return np.ones_like(x) / len(x)
df[weight_col] = df.groupby(group_col)[weight_col].transform(normalize_group)
return df
def normalize_funding(df):
"""
Normalize funding weights for child repositories grouped by parent.
"""
print("[INFO] Normalizing funding weights...", flush=True)
if df is None or df.empty:
print("[WARN] Skipping normalization: DataFrame is None or empty.", flush=True)
return df
df = normalize_and_clip_weights(df)
print("[INFO] Funding weights normalized successfully.", flush=True)
return df
def prepare_dataset(file):
print("[INFO] Starting dataset preparation...")
start_time = time.time()
df = load_data(file)
if df is None:
raise ValueError("Failed to load data.")
if not {"repo", "parent"}.issubset(df.columns):
raise ValueError("Input CSV must contain 'repo' and 'parent' columns.")
print("[INFO] Fetching GitHub features...")
df = fetch_github_features(df)
print("[INFO] GitHub features fetched successfully.")
print("[INFO] Cleaning data...")
df = clean_data(df)
print("[INFO] Data cleaned successfully.")
print("[INFO] Assigning base weights using LLama model...")
df = assign_base_weight(df)
df = sanity_check_weights(df) # Add sanity-check and ensemble weights
df = train_predict_weight(df)
if df is not None and not df.empty:
visualize_feature_distributions(df)
else:
print("[WARN] DataFrame is empty after processing. Skipping visualization.")
df = normalize_funding(df)
end_time = time.time()
print(f"[INFO] Dataset preparation completed in {end_time - start_time:.2f} seconds.")
return df
##############################
# Data Cleaning
##############################
def clean_data(df):
"""
Cleans the input DataFrame by handling missing values and removing outliers.
"""
# Impute missing values
df.fillna(df.median(numeric_only=True), inplace=True)
# Remove extreme outliers using quantiles
for col in df.select_dtypes(include=[np.number]).columns:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
df[col] = df[col].clip(lower=lower_bound, upper=upper_bound)
return df
##############################
# Feature Validation and Scaling
##############################
def validate_features(df):
"""
Validates and scales features to ensure they are meaningful for model training.
"""
print("[INFO] Validating and scaling features...")
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col]= log1p(df[col].clip(lower=0))
scaler = StandardScaler()
# Scale numeric features
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
print("[INFO] Features scaled successfully.")
return df
def validate_target(df):
"""
Validates the target variable to ensure it has sufficient variance.
If variance is insufficient, adds small random noise to create variance.
"""
print("[INFO] Validating target variable 'base_weight'...")
target = "base_weight"
if target not in df.columns:
raise ValueError(f"Target variable '{target}' not found in DataFrame.")
variance = df[target].var()
print(f"[DEBUG] Target variable variance: {variance}")
if variance < 1e-6:
print("[WARN] Target variable has insufficient variance. Adding small random noise...")
# Add small random noise to introduce variance
np.random.seed(42) # For reproducibility
noise = np.random.normal(0.5, 0.1, size=len(df))
df[target] = noise
print(f"[INFO] New target variable variance: {df[target].var()}")
return df
##############################
# Model Training and Prediction
##############################
def train_predict_weight(df):
"""
Trains an XGBoost Regressor with GroupKFold cross-validation and extensive hyperparameter tuning.
"""
logging.info("Starting model training with GroupKFold validation...")
target_col = 'base_weight'
drop_cols = ["repo", "parent", "activity", "base_weight_raw", "base_weight_log", target_col]
feature_cols = [col for col in df.select_dtypes(include=np.number).columns if col not in drop_cols]
X = df[feature_cols].copy()
y = df[target_col]
groups = df['parent']
# RATIONALE (Recommendation 2): Log-transforming skewed input features helps the model by
# making their distributions more normal, improving the performance of the regressor.
skewed_features = ['stars', 'forks', 'watchers', 'open_issues', 'pulls', 'contributors', 'stars_per_contributor']
for col in skewed_features:
if col in X.columns:
X[col] = np.log1p(X[col])
pipeline = Pipeline([("scaler", RobustScaler()),
("xgb", XGBRegressor(objective="reg:squarederror", n_jobs=-1, random_state=42, verbosity=0))])
param_dist = {'xgb__n_estimators': [100, 300, 500, 700], 'xgb__max_depth': [3, 5, 7, 9],
'xgb__learning_rate': [0.01, 0.02, 0.05, 0.1], 'xgb__subsample': [0.6, 0.7, 0.8, 0.9],
'xgb__colsample_bytree': [0.6, 0.7, 0.8, 0.9]}
# RATIONALE (Recommendation 3): GroupKFold ensures that all repos from the same parent are in the
# same fold. This prevents data leakage and gives a realistic measure of true performance.
cv = GroupKFold(n_splits=5)
# RATIONALE (Recommendation 4): Increasing n_iter explores more hyperparameter combinations,
# increasing the chance of finding a better-performing model.
search = RandomizedSearchCV(
pipeline, param_distributions=param_dist, n_iter=50, cv=cv.split(X, y, groups),
scoring="neg_root_mean_squared_error", verbose=1, n_jobs=-1, random_state=42
)
search.fit(X, y)
best_model = search.best_estimator_
logging.info(f"Best CV score (neg RMSE): {search.best_score_:.4f}")
logging.info(f"Best parameters found: {search.best_params_}")
df['final_weight'] = best_model.predict(X)
df['final_weight'] = df['final_weight'].clip(lower=0)
df['final_weight'] = df.groupby("parent")['final_weight'].transform(
lambda w: w / w.sum() if w.sum() > 0 else np.ones_like(w) / len(w))
return df
##############################
# CSV Output
##############################
def create_submission_csv(df, output_filename="submission.csv"):
"""Saves the final predictions to a CSV file."""
logging.info(f"Writing final results to {output_filename}...")
df[["repo", "parent", "final_weight"]].to_csv(output_filename, index=False)
logging.info(f"Successfully created {output_filename}.")
if __name__ == "__main__":
if 'GITHUB_API_TOKEN' not in os.environ:
logging.warning("GITHUB_API_TOKEN environment variable not set. API rate limits will be low.")
input_file = "input.csv"
output_file = "submission_enhanced.csv"
if not os.path.exists(input_file):
logging.error(f"Input file not found: {input_file}. Please create it with 'repo' and 'parent' columns.")
sys.exit(1)
logging.info("--- Starting DeepFunding Oracle - Enhanced Process ---")
# Execute the full pipeline
main_df = pd.read_csv(input_file)
main_df = fetch_github_features(main_df)
main_df = add_derived_features(main_df)
main_df = assign_base_weight(main_df)
main_df = train_predict_weight(main_df)
create_submission_csv(main_df, output_file)
logging.info("--- Process Completed Successfully ---")