sivapriya175
deploy backend files
27e29a2
import pandas as pd
import numpy as np
from fastapi import HTTPException
from models.train_model import (
load_and_preprocess_data, train_team_performance_model, train_player_score_model,
predict_player_score, predict_team_performance
)
from groq import Groq
# Global variables to store models and data
TEAM_WIN_MODEL = None
TEAM_SCORE_MODEL = None
TEAM_DATA = None
TEAM_SCALER = None
PLAYER_SCORE_MODEL = None
PLAYER_SCALER = None
PLAYER_DATA = None
MATCH_DF = None
BALL_DF = None
# Initialize Groq client
GROQ_API_KEY = "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"
client = Groq(api_key=GROQ_API_KEY)
# Load data and train models at startup
def initialize_models():
global TEAM_WIN_MODEL, TEAM_SCORE_MODEL, TEAM_DATA, TEAM_SCALER
global PLAYER_SCORE_MODEL, PLAYER_SCALER, PLAYER_DATA, MATCH_DF, BALL_DF
MATCH_DF, BALL_DF = load_and_preprocess_data()
TEAM_WIN_MODEL, TEAM_SCORE_MODEL, TEAM_DATA, TEAM_SCALER = train_team_performance_model(MATCH_DF)
PLAYER_SCORE_MODEL, PLAYER_SCALER, PLAYER_DATA = train_player_score_model(MATCH_DF, BALL_DF)
print("Models trained and loaded into memory.")
# Call this at app startup (see main.py below)
initialize_models()
# Player-team mapping
player_team_mapping = BALL_DF.groupby('striker')['batting_team'].agg(lambda x: x.mode()[0] if len(x.mode()) > 0 else None).to_dict()
# Clean JSON data (unchanged)
def clean_json(data):
if isinstance(data, dict):
return {k: clean_json(v) for k, v in data.items()}
elif isinstance(data, list):
return [clean_json(v) for v in data]
elif isinstance(data, float):
return 0.0 if pd.isna(data) or np.isinf(data) else data
elif pd.isna(data):
return None
elif isinstance(data, pd.Timestamp):
return data.strftime('%Y-%m-%d') if pd.notna(data) else None
elif isinstance(data, (int, bool)):
return data
return str(data)
# Summary generation (unchanged)
def generate_summary(data, context_type):
prompt = ""
if context_type == "player_stats":
prompt = f"Summarize this player data in one sentence: {data}"
elif context_type == "team_stats":
prompt = f"Summarize this team data in one sentence: {data}"
elif context_type == "match_history":
prompt = f"Summarize this match history between {data['team1']} and {data['team2']} in one sentence: {data['matches']}"
elif context_type == "prediction_score":
prompt = f"Summarize this prediction in one sentence: {data}"
elif context_type == "prediction_team":
prompt = f"Summarize this team prediction in one sentence: {data}"
try:
chat_completion = client.chat.completions.create(
model="mixtral-8x7b-32768",
messages=[
{"role": "system", "content": "You are a concise cricket analyst."},
{"role": "user", "content": prompt}
],
max_tokens=50,
temperature=0.7
)
return chat_completion.choices[0].message.content.strip()
except Exception as e:
return f"Summary unavailable due to error: {str(e)}"
# Player stats (unchanged except using global BALL_DF)
def get_player_stats(player_name: str, season: str = None, role: str = "Batting"):
player_name = player_name.strip().title()
name_variations = [player_name, player_name.replace(" ", ""), " ".join(reversed(player_name.split()))]
player_data = BALL_DF[BALL_DF['striker'].isin(name_variations) | BALL_DF['bowler'].isin(name_variations)]
if season and 'season' in BALL_DF.columns:
player_data = player_data[player_data['season'] == season]
if player_data.empty:
raise HTTPException(status_code=404, detail=f"Player '{player_name}' not found. Variations tried: {name_variations}")
if role == "Batting":
batting_data = player_data[player_data['striker'].isin(name_variations)]
total_runs = int(batting_data['runs_off_bat'].sum())
balls_faced = int(batting_data.shape[0])
strike_rate = float((total_runs / balls_faced * 100) if balls_faced > 0 else 0)
matches_played = int(len(batting_data['match_id'].unique()))
stats = {
"player_name": player_name,
"role": role,
"total_runs": total_runs,
"balls_faced": balls_faced,
"strike_rate": strike_rate,
"matches_played": matches_played,
"season": season if season else "All Seasons"
}
stats["summary"] = generate_summary(stats, "player_stats")
return clean_json(stats)
elif role == "Bowling":
bowling_data = player_data[player_data['bowler'].isin(name_variations)]
bowler_wicket_types = ["caught", "bowled", "lbw", "caught and bowled", "hit wicket"]
wickets_data = bowling_data[bowling_data['player_dismissed'].notna() &
bowling_data['wicket_type'].isin(bowler_wicket_types)]
total_wickets = int(wickets_data.shape[0])
total_runs_conceded = int(bowling_data['total_runs'].sum())
total_balls_bowled = int(bowling_data.shape[0])
total_overs_bowled = float(total_balls_bowled / 6)
bowling_average = float(total_runs_conceded / total_wickets) if total_wickets > 0 else float('inf')
economy_rate = float(total_runs_conceded / total_overs_bowled) if total_overs_bowled > 0 else 0
bowling_strike_rate = float(total_balls_bowled / total_wickets) if total_wickets > 0 else float('inf')
bowling_matches = int(len(bowling_data['match_id'].unique()))
stats = {
"player_name": player_name,
"role": role,
"total_wickets": total_wickets,
"bowling_average": 0.0 if np.isinf(bowling_average) else round(bowling_average, 2),
"economy_rate": round(economy_rate, 2),
"bowling_strike_rate": 0.0 if np.isinf(bowling_strike_rate) else round(bowling_strike_rate, 2),
"overs_bowled": round(total_overs_bowled, 1),
"bowling_matches": bowling_matches,
"season": season if season else "All Seasons"
}
stats["summary"] = generate_summary(stats, "player_stats")
return clean_json(stats)
# Team stats (unchanged except using global MATCH_DF)
def get_team_stats(team_name: str, season: str = None):
team_name = team_name.strip().title()
team_matches = MATCH_DF[(MATCH_DF['team1'] == team_name) | (MATCH_DF['team2'] == team_name)]
if season and 'season' in MATCH_DF.columns:
team_matches = team_matches[team_matches['season'] == season]
if team_matches.empty:
raise HTTPException(status_code=404, detail="Team not found")
wins = int(team_matches[team_matches['winner'] == team_name].shape[0])
total_matches = int(team_matches.shape[0])
stats = {
"total_matches": total_matches,
"wins": wins,
"losses": total_matches - wins,
"win_percentage": float((wins / total_matches * 100) if total_matches > 0 else 0),
"season": season if season else "All Seasons"
}
stats["summary"] = generate_summary(stats, "team_stats")
return clean_json(stats)
# Match history (unchanged except using global MATCH_DF)
def get_match_history(team1: str, team2: str, season: str = None):
team1 = team1.strip().title()
team2 = team2.strip().title()
available_teams = set(MATCH_DF['team1'].unique().tolist() + MATCH_DF['team2'].unique().tolist())
if team1 not in available_teams or team2 not in available_teams:
raise HTTPException(status_code=404, detail=f"Team {team1 if team1 not in available_teams else team2} not found.")
team_matches = MATCH_DF[
((MATCH_DF['team1'] == team1) & (MATCH_DF['team2'] == team2)) |
((MATCH_DF['team1'] == team2) & (MATCH_DF['team2'] == team1))
].copy()
if season and 'season' in MATCH_DF.columns:
team_matches = team_matches[team_matches['season'] == season]
if team_matches.empty:
raise HTTPException(status_code=404, detail=f"No match history found between {team1} and {team2}.")
team_matches['date'] = team_matches['date'].apply(lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else None)
team_matches['winner'] = team_matches['winner'].fillna("Draw")
for column in ['team1', 'team2', 'winner']:
team_matches[column] = team_matches[column].apply(lambda x: str(x) if pd.notna(x) else None)
history = team_matches[['date', 'team1', 'team2', 'winner']].to_dict(orient='records')
response = {
"team1": team1,
"team2": team2,
"season": season if season else "All Seasons",
"matches": history
}
response["summary"] = generate_summary(response, "match_history")
return clean_json(response)
# Prediction functions using in-memory models
def predict_score(player_name: str, opposition_team: str):
try:
player_name = player_name.strip().replace("+", " ").title()
name_variations = [player_name, player_name.replace(" ", ""), " ".join(reversed(player_name.split()))]
player_team = None
for name in name_variations:
if name in player_team_mapping:
player_team = player_team_mapping[name]
player_name = name
break
if not player_team:
raise ValueError(f"Player {player_name} not found in historical data")
predicted_runs = predict_player_score(
player=player_name,
team=player_team,
opponent=opposition_team,
venue=None,
city=None,
toss_winner=None,
toss_decision=None,
score_model=PLAYER_SCORE_MODEL,
scaler=PLAYER_SCALER,
player_data=PLAYER_DATA
)
stats = {
"player": player_name,
"team": player_team,
"opposition": opposition_team,
"predicted_runs": predicted_runs["expected_score"]
}
stats["summary"] = generate_summary(stats, "prediction_score")
return clean_json(stats)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error predicting score for {player_name} against {opposition_team}: {str(e)}")
def predict_team_outcome(team1: str, team2: str):
prediction = predict_team_performance(
team1=team1,
team2=team2,
venue=None,
city=None,
toss_winner=None,
toss_decision=None,
win_model=TEAM_WIN_MODEL,
score_model=TEAM_SCORE_MODEL,
data=TEAM_DATA,
scaler=TEAM_SCALER
)
prediction["summary"] = generate_summary(prediction, "prediction_team")
return clean_json(prediction)
# Utility functions (unchanged except using global dataframes)
def get_teams():
return clean_json({"teams": sorted(set(MATCH_DF['team1'].unique().tolist() + MATCH_DF['team2'].unique().tolist()))})
def get_players():
unique_players = sorted(set(BALL_DF['striker'].dropna().unique().tolist()))
return clean_json({"players": unique_players})
def get_seasons():
return clean_json({"seasons": ["All Seasons"] + sorted(MATCH_DF['season'].dropna().unique().tolist())})
# Team trends (unchanged except using global MATCH_DF)
def get_team_trends(team_name: str):
team_name = team_name.strip().title()
team_matches = MATCH_DF[(MATCH_DF['team1'] == team_name) | (MATCH_DF['team2'] == team_name)]
if team_matches.empty:
raise HTTPException(status_code=404, detail="Team not found")
trends = []
for season in MATCH_DF['season'].unique():
season_matches = team_matches[team_matches['season'] == season]
if not season_matches.empty:
wins = season_matches[season_matches['winner'] == team_name].shape[0]
total_matches = season_matches.shape[0]
win_percentage = (wins / total_matches * 100) if total_matches > 0 else 0
trends.append({
"season": season,
"wins": wins,
"total_matches": total_matches,
"win_percentage": win_percentage
})
return {"team_name": team_name, "trends": trends}
# Player trends (unchanged except using global BALL_DF)
def get_player_trends(player_name: str, role: str = "Batting"):
player_name = player_name.strip().title()
name_variations = [player_name, player_name.replace(" ", ""), " ".join(reversed(player_name.split()))]
player_data = BALL_DF[BALL_DF['striker'].isin(name_variations) | BALL_DF['bowler'].isin(name_variations)]
if player_data.empty:
raise HTTPException(status_code=404, detail=f"Player '{player_name}' not found")
trends = []
for season in BALL_DF['season'].unique():
season_data = player_data[player_data['season'] == season]
if not season_data.empty:
if role == "Batting":
total_runs = int(season_data['runs_off_bat'].sum())
balls_faced = int(season_data.shape[0])
strike_rate = float((total_runs / balls_faced * 100) if balls_faced > 0 else 0)
matches_played = int(len(season_data['match_id'].unique()))
trends.append({
"season": season,
"total_runs": total_runs,
"strike_rate": strike_rate,
"matches_played": matches_played
})
elif role == "Bowling":
total_wickets = int(season_data[season_data['wicket_type'].notna()].shape[0])
total_runs_conceded = int(season_data['total_runs'].sum())
total_overs_bowled = float(season_data.shape[0] / 6)
bowling_average = float(total_runs_conceded / total_wickets) if total_wickets > 0 else float('inf')
economy_rate = float(total_runs_conceded / total_overs_bowled) if total_overs_bowled > 0 else 0
matches_played = int(len(season_data['match_id'].unique()))
trends.append({
"season": season,
"total_wickets": total_wickets,
"bowling_average": bowling_average,
"economy economy_rate": economy_rate,
"matches_played": matches_played
})
return {"player_name": player_name, "role": role, "trends": trends}