Spaces:
Running
Running
File size: 6,207 Bytes
6441bc6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
#!/usr/bin/env python3
"""
Script to transform your production database into HuggingFace dataset format.
Follows the same pattern as FutureBench's convert_to_csv.py but simplified.
"""
import os
import sys
import tempfile
from datetime import datetime
import pandas as pd
from huggingface_hub import HfApi
# Add the parent directory to sys.path to allow imports (same as convert_to_csv.py)
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Import FutureBench models and database (same as convert_to_csv.py)
# Import configuration
from config_db import HF_CONFIG, PROCESSING_CONFIG
from future_bench.database import get_session
from future_bench.models import EventBase, Prediction
def datetime_to_string(dt):
"""Convert datetime to string or return empty string if None (same as convert_to_csv.py)"""
return dt.isoformat() if dt else ""
def extract_events_and_predictions(session):
"""
Extract events and predictions from your database.
Uses the same SQLAlchemy ORM approach as convert_to_csv.py.
"""
# Get all events (same as convert_to_csv.py)
events = session.query(EventBase).all()
if not events:
print("No events found in the database.")
return pd.DataFrame()
# Get all predictions (same as convert_to_csv.py)
predictions = session.query(Prediction).all()
if not predictions:
print("No predictions found in the database.")
return pd.DataFrame()
# Create combined view (same logic as convert_to_csv.py)
combined_data = []
for event in events:
if event.result is None: # Skip unresolved events
continue
event_predictions = [p for p in predictions if p.event_id == event.id]
for pred in event_predictions:
combined_data.append(
{
"event_id": event.id,
"question": event.question,
"event_type": event.event_type,
"open_to_bet_until": datetime_to_string(event.open_to_bet_until),
"result": event.result,
"algorithm_name": pred.algorithm_name,
"actual_prediction": pred.actual_prediction,
"prediction_created_at": datetime_to_string(pred.created_at),
}
)
df = pd.DataFrame(combined_data)
return df
def transform_to_standard_format(df):
"""
Transform your raw data into the standard format expected by your leaderboard.
This should match the CSV format your leaderboard already expects.
"""
# Convert date columns with flexible parsing for microseconds
df["open_to_bet_until"] = pd.to_datetime(df["open_to_bet_until"], format="mixed")
df["prediction_created_at"] = pd.to_datetime(df["prediction_created_at"], format="mixed")
# Add any additional columns your leaderboard expects
df["source"] = "your-app" # Add source identifier
# Filter to data starting from June 12th
cutoff_date = datetime(2025, 6, 12)
df = df[df["prediction_created_at"] >= cutoff_date]
print(f" Filtered to predictions created from {cutoff_date.strftime('%B %d, %Y')} onwards: {len(df)} records remaining")
# Filter by event types
df = df[df["event_type"].isin(PROCESSING_CONFIG["event_types"])]
# Exclude test models
df = df[~df["algorithm_name"].isin(PROCESSING_CONFIG["exclude_models"])]
# Calculate accuracy per model (for summary)
accuracy_df = df.groupby(["algorithm_name", "event_type"]).agg({"actual_prediction": "count", "result": lambda x: (df.loc[x.index, "actual_prediction"] == x).sum()}).rename(columns={"actual_prediction": "total_predictions", "result": "correct_predictions"}).reset_index()
accuracy_df["accuracy"] = accuracy_df["correct_predictions"] / accuracy_df["total_predictions"]
return df, accuracy_df
def upload_to_huggingface(df, accuracy_df, repo_data, repo_results):
"""
Upload the transformed data to HuggingFace repositories.
"""
api = HfApi(token=HF_CONFIG["token"])
# Create temporary directory for files
with tempfile.TemporaryDirectory() as tmp_dir:
# Save main dataset
data_path = os.path.join(tmp_dir, "data.csv")
df.to_csv(data_path, index=False)
# Save accuracy summary
results_path = os.path.join(tmp_dir, "results.csv")
accuracy_df.to_csv(results_path, index=False)
# Upload to data repo
api.upload_file(path_or_fileobj=data_path, path_in_repo="data.csv", repo_id=repo_data, repo_type="dataset")
# Upload to results repo
api.upload_file(path_or_fileobj=results_path, path_in_repo="results.csv", repo_id=repo_results, repo_type="dataset")
print(f"โ
Uploaded data to {repo_data}")
print(f"โ
Uploaded results to {repo_results}")
def main():
"""Main pipeline function"""
print("๐ Starting database to HuggingFace pipeline...")
# Step 1: Extract from database (same as convert_to_csv.py)
print("๐ Extracting data from database...")
session = next(get_session())
try:
df = extract_events_and_predictions(session)
print(f" Found {len(df)} event-prediction pairs")
finally:
session.close()
if len(df) == 0:
print("โ No data found in database")
return
# Step 2: Transform to standard format
print("๐ Transforming data...")
df, accuracy_df = transform_to_standard_format(df)
print(f" Processed {len(df)} records")
print(f" Generated accuracy stats for {len(accuracy_df)} model-task pairs")
# Step 3: Upload to HuggingFace
if HF_CONFIG["token"]:
print("โ๏ธ Uploading to HuggingFace...")
upload_to_huggingface(df, accuracy_df, HF_CONFIG["data_repo"], HF_CONFIG["results_repo"])
else:
print("โ ๏ธ No HF_TOKEN found, saving locally instead...")
df.to_csv("data_export.csv", index=False)
accuracy_df.to_csv("results_export.csv", index=False)
print(" Saved data_export.csv and results_export.csv")
print("โ
Pipeline completed successfully!")
if __name__ == "__main__":
main()
|