|
import pandas as pd |
|
import numpy as np |
|
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingRegressor |
|
from sklearn.multioutput import MultiOutputRegressor |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import mean_squared_error, accuracy_score, r2_score |
|
from sklearn.preprocessing import StandardScaler |
|
|
|
|
|
def load_and_preprocess_data(): |
|
|
|
ball_df = pd.read_csv('data/cleaned_ball_data.csv', |
|
dtype={ |
|
'match_id': str, 'season': str, 'start_date': str, 'venue': str, |
|
'innings': int, 'ball': float, 'batting_team': str, 'bowling_team': str, |
|
'striker': str, 'non_striker': str, 'bowler': str, 'runs_off_bat': int, |
|
'extras': int, 'wides': float, 'noballs': float, 'byes': float, |
|
'legbyes': float, 'penalty': float, 'wicket_type': str, |
|
'player_dismissed': str, 'other_wicket_type': str, |
|
'other_player_dismissed': str, 'cricsheet_id': str, 'total_runs': int |
|
}) |
|
match_df = pd.read_csv('data/cleaned_match_data.csv', |
|
dtype={ |
|
'id': str, 'season': str, 'city': str, 'date': str, |
|
'team1': str, 'team2': str, 'toss_winner': str, 'toss_decision': str, |
|
'result': str, 'dl_applied': int, 'winner': str, |
|
'win_by_runs': float, 'win_by_wickets': float, 'player_of_match': str, |
|
'venue': str, 'umpire1': str, 'umpire2': str, 'umpire3': str |
|
}) |
|
|
|
|
|
match_df['date'] = pd.to_datetime(match_df['date'], errors='coerce') |
|
ball_df['start_date'] = pd.to_datetime(ball_df['start_date'], errors='coerce') |
|
|
|
|
|
odi_date_mask = (match_df['date'].dt.year >= 2015) & (match_df['date'].dt.year <= 2022) |
|
match_df = match_df[odi_date_mask].copy() |
|
|
|
|
|
team_scores = ball_df.groupby(['match_id', 'batting_team'])['total_runs'].sum().reset_index() |
|
team_scores.rename(columns={'total_runs': 'team_total'}, inplace=True) |
|
|
|
|
|
match_df = match_df.merge(team_scores, left_on=['id', 'team1'], right_on=['match_id', 'batting_team'], how='left') |
|
match_df.rename(columns={'team_total': 'team1_total'}, inplace=True) |
|
match_df['team1_total'] = match_df['team1_total'].fillna(match_df['team1_total'].mean()) |
|
match_df = match_df.merge(team_scores, left_on=['id', 'team2'], right_on=['match_id', 'batting_team'], how='left') |
|
match_df.rename(columns={'team_total': 'team2_total'}, inplace=True) |
|
match_df['team2_total'] = match_df['team2_total'].fillna(match_df['team2_total'].mean()) |
|
match_df.drop(columns=['batting_team', 'match_id'], errors='ignore', inplace=True) |
|
|
|
|
|
match_df['venue_index'] = match_df['venue'].astype('category').cat.codes |
|
match_df['city_index'] = match_df['city'].astype('category').cat.codes |
|
|
|
|
|
match_df['toss_winner_index'] = match_df['toss_winner'].astype('category').cat.codes |
|
match_df['toss_decision_index'] = match_df['toss_decision'].map({'bat': 1, 'field': 0}).fillna(0).astype(int) |
|
|
|
|
|
match_df['date_numeric'] = (match_df['date'] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1d') |
|
max_date = match_df['date_numeric'].max() |
|
team1_wins = match_df[match_df['winner'] == match_df['team1']].groupby('team1').agg({'date_numeric': 'mean', 'id': 'count'}).reset_index() |
|
team1_wins.rename(columns={'id': 'wins', 'date_numeric': 'win_date', 'team1': 'team'}, inplace=True) |
|
team2_wins = match_df[match_df['winner'] == match_df['team2']].groupby('team2').agg({'date_numeric': 'mean', 'id': 'count'}).reset_index() |
|
team2_wins.rename(columns={'id': 'wins', 'date_numeric': 'win_date', 'team2': 'team'}, inplace=True) |
|
team_wins = pd.concat([team1_wins, team2_wins]).groupby('team').agg({'wins': 'sum', 'win_date': 'mean'}).reset_index() |
|
team1_matches = match_df.groupby('team1').size().reset_index(name='matches') |
|
team1_matches.rename(columns={'team1': 'team'}, inplace=True) |
|
team2_matches = match_df.groupby('team2').size().reset_index(name='matches') |
|
team2_matches.rename(columns={'team2': 'team'}, inplace=True) |
|
team_matches = pd.concat([team1_matches, team2_matches]).groupby('team')['matches'].sum().reset_index() |
|
team_win_rates = team_matches.merge(team_wins, on='team', how='left').fillna(0) |
|
team_win_rates['weighted_wins'] = team_win_rates.apply(lambda x: x['wins'] * np.exp(-0.1 * (max_date - x['win_date']) / 365) if pd.notna(x['win_date']) else 0, axis=1) |
|
team_win_rates['win_rate'] = team_win_rates['weighted_wins'] / team_win_rates['matches'] |
|
team_win_rates['win_rate'] = team_win_rates['win_rate'].fillna(0) |
|
match_df = match_df.merge(team_win_rates[['team', 'win_rate']].rename(columns={'team': 'team1', 'win_rate': 'team1_win_rate'}), on='team1', how='left') |
|
match_df = match_df.merge(team_win_rates[['team', 'win_rate']].rename(columns={'team': 'team2', 'win_rate': 'team2_win_rate'}), on='team2', how='left') |
|
|
|
|
|
head_to_head = match_df[match_df['team1'].isin(match_df['team1'].unique()) & match_df['team2'].isin(match_df['team2'].unique())] |
|
head_to_head_wins = head_to_head[head_to_head['winner'] == head_to_head['team1']].groupby(['team1', 'team2']).size().reset_index(name='h2h_wins') |
|
head_to_head_matches = head_to_head.groupby(['team1', 'team2']).size().reset_index(name='h2h_matches') |
|
h2h_win_rates = head_to_head_matches.merge(head_to_head_wins, on=['team1', 'team2'], how='left').fillna(0) |
|
h2h_win_rates = h2h_win_rates[head_to_head_matches['h2h_matches'] >= 1] |
|
h2h_win_rates['h2h_win_rate'] = h2h_win_rates['h2h_wins'] / h2h_win_rates['h2h_matches'] |
|
match_df = match_df.merge(h2h_win_rates[['team1', 'team2', 'h2h_win_rate']], on=['team1', 'team2'], how='left').fillna(0) |
|
|
|
|
|
match_df['team1_total'] = match_df['team1_total'].clip(upper=500) |
|
match_df['team2_total'] = match_df['team2_total'].clip(upper=500) |
|
|
|
return match_df, ball_df |
|
|
|
|
|
def train_team_performance_model(match_df): |
|
data = match_df[['team1', 'team2', 'winner', 'team1_total', 'team2_total', 'venue_index', 'city_index', |
|
'toss_winner_index', 'toss_decision_index', 'dl_applied', 'team1_win_rate', |
|
'team2_win_rate', 'h2h_win_rate']].dropna() |
|
|
|
|
|
data['team1_index'] = data['team1'].astype('category').cat.codes |
|
data['team2_index'] = data['team2'].astype('category').cat.codes |
|
data['winner_index'] = (data['winner'] == data['team1']).astype(int) |
|
|
|
|
|
X = pd.DataFrame() |
|
X['team1_index'] = data['team1_index'] |
|
X['team2_index'] = data['team2_index'] |
|
X['venue_index'] = data['venue_index'] |
|
X['city_index'] = data['city_index'] |
|
X['toss_winner_index'] = data['toss_winner_index'] |
|
X['toss_decision_index'] = data['toss_decision_index'] |
|
X['dl_applied'] = data['dl_applied'] |
|
X['team1_win_rate'] = data['team1_win_rate'] |
|
X['team2_win_rate'] = data['team2_win_rate'] |
|
X['h2h_win_rate'] = data['h2h_win_rate'] * 2 |
|
|
|
y_win = data['winner_index'] |
|
y_score = data[['team1_total', 'team2_total']] |
|
|
|
|
|
scaler = StandardScaler() |
|
scaled_features = scaler.fit_transform(X[['venue_index', 'city_index', 'toss_winner_index', 'toss_decision_index', |
|
'dl_applied', 'team1_win_rate', 'team2_win_rate', 'h2h_win_rate']]) |
|
X_scaled = pd.DataFrame(scaled_features, columns=['venue_index', 'city_index', 'toss_winner_index', 'toss_decision_index', |
|
'dl_applied', 'team1_win_rate', 'team2_win_rate', 'h2h_win_rate']) |
|
X_scaled['team1_index'] = X['team1_index'] |
|
X_scaled['team2_index'] = X['team2_index'] |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_win, test_size=0.2, random_state=42) |
|
|
|
|
|
win_model = RandomForestClassifier(n_estimators=200, max_depth=15, random_state=42, class_weight='balanced') |
|
win_model.fit(X_train, y_train) |
|
|
|
|
|
base_score_model = HistGradientBoostingRegressor(random_state=42, learning_rate=0.1, max_iter=100) |
|
score_model = MultiOutputRegressor(base_score_model) |
|
score_model.fit(X_scaled, y_score) |
|
|
|
return win_model, score_model, data, scaler |
|
|
|
|
|
def train_player_score_model(match_df, ball_df): |
|
player_runs = ball_df.groupby(['match_id', 'striker', 'batting_team'])['runs_off_bat'].sum().reset_index() |
|
player_runs.rename(columns={'runs_off_bat': 'player_total'}, inplace=True) |
|
player_data = player_runs.merge(match_df, left_on='match_id', right_on='id', how='left') |
|
|
|
|
|
player_data['player_avg'] = player_data.groupby('striker')['player_total'].transform('mean') |
|
player_data['team_win_rate'] = player_data.apply(lambda x: player_data[player_data['team1'] == x['batting_team']]['team1_win_rate'].mean() |
|
if x['batting_team'] == x['team1'] else player_data[player_data['team2'] == x['batting_team']]['team2_win_rate'].mean(), axis=1) |
|
player_data['venue_index'] = player_data['venue'].astype('category').cat.codes |
|
player_data['city_index'] = player_data['city'].astype('category').cat.codes |
|
player_data['toss_winner_index'] = player_data['toss_winner'].astype('category').cat.codes |
|
player_data['toss_decision_index'] = player_data['toss_decision'].map({'bat': 1, 'field': 0}).fillna(0).astype(int) |
|
|
|
|
|
X = player_data[['player_avg', 'team_win_rate', 'venue_index', 'city_index', 'toss_winner_index', 'toss_decision_index']].dropna() |
|
y = player_data.loc[X.index, 'player_total'] |
|
|
|
|
|
scaler = StandardScaler() |
|
X_scaled = scaler.fit_transform(X) |
|
|
|
|
|
score_model = HistGradientBoostingRegressor(random_state=42, learning_rate=0.1, max_iter=100) |
|
score_model.fit(X_scaled, y) |
|
|
|
return score_model, scaler, player_data |
|
|
|
|
|
def predict_player_score(player, team, opponent, venue=None, city=None, toss_winner=None, toss_decision=None, |
|
score_model=None, scaler=None, player_data=None): |
|
try: |
|
if player not in player_data['striker'].values or team not in player_data['batting_team'].values: |
|
raise ValueError("Player or team not found in training data") |
|
|
|
player_avg = player_data[player_data['striker'] == player]['player_total'].mean() |
|
team_win_rate = player_data[player_data['batting_team'] == team]['team_win_rate'].mean() |
|
venue_index = player_data[player_data['venue'] == venue]['venue_index'].values[0] if venue else player_data['venue_index'].mean() |
|
city_index = player_data[player_data['city'] == city]['city_index'].values[0] if city else player_data['city_index'].mean() |
|
toss_winner_index = player_data[player_data['toss_winner'] == toss_winner]['toss_winner_index'].values[0] if toss_winner else player_data['toss_winner_index'].mean() |
|
toss_decision_index = 1 if toss_decision == 'bat' else 0 if toss_decision == 'field' else player_data['toss_decision_index'].mean() |
|
|
|
features = scaler.transform([[player_avg, team_win_rate, venue_index, city_index, toss_winner_index, toss_decision_index]]) |
|
predicted_score = score_model.predict(features)[0] |
|
|
|
return { |
|
"player": player, |
|
"team": team, |
|
"opponent": opponent, |
|
"expected_score": round(predicted_score, 2) |
|
} |
|
except Exception as e: |
|
print(f"Prediction error: {str(e)}") |
|
return { |
|
"player": player, |
|
"team": team, |
|
"opponent": opponent, |
|
"expected_score": 0.0 |
|
} |
|
|
|
def predict_team_performance(team1, team2, venue=None, city=None, toss_winner=None, toss_decision=None, |
|
win_model=None, score_model=None, data=None, scaler=None): |
|
try: |
|
if team1 not in data['team1'].values or team2 not in data['team2'].values: |
|
raise ValueError("Team not found in training data") |
|
|
|
team1_index = data[data['team1'] == team1]['team1_index'].values[0] |
|
team2_index = data[data['team2'] == team2]['team2_index'].values[0] |
|
venue_index = data[data['venue'] == venue]['venue_index'].values[0] if venue else data['venue_index'].mean() |
|
city_index = data[data['city'] == city]['city_index'].values[0] if city else data['city_index'].mean() |
|
toss_winner_index = data[data['toss_winner'] == toss_winner]['toss_winner_index'].values[0] if toss_winner else data['toss_winner_index'].mean() |
|
toss_decision_index = 1 if toss_decision == 'bat' else 0 if toss_decision == 'field' else data['toss_decision_index'].mean() |
|
dl_applied = 0 if pd.isna(toss_decision) else data['dl_applied'].mean() |
|
team1_win_rate = data[data['team1'] == team1]['team1_win_rate'].values[0] |
|
team2_win_rate = data[data['team2'] == team2]['team2_win_rate'].values[0] |
|
h2h_win_rate = data[(data['team1'] == team1) & (data['team2'] == team2)]['h2h_win_rate'].values[0] if not data[(data['team1'] == team1) & (data['team2'] == team2)].empty else 0 |
|
|
|
features = scaler.transform([[venue_index, city_index, toss_winner_index, toss_decision_index, dl_applied, |
|
team1_win_rate, team2_win_rate, h2h_win_rate]]) |
|
win_probability = win_model.predict_proba([[team1_index, team2_index, features[0][0], features[0][1], |
|
features[0][2], features[0][3], features[0][4], features[0][5], |
|
features[0][6], features[0][7]]])[:, 1][0] * 100 |
|
predicted_scores = score_model.predict([[team1_index, team2_index, features[0][0], features[0][1], |
|
features[0][2], features[0][3], features[0][4], features[0][5], |
|
features[0][6], features[0][7]]])[0] |
|
|
|
return { |
|
"team1": team1, |
|
"team2": team2, |
|
"win_probability_team1": round(win_probability, 2), |
|
"expected_team1_score": round(predicted_scores[0], 2), |
|
"expected_team2_score": round(predicted_scores[1], 2) |
|
} |
|
except Exception as e: |
|
print(f"Prediction error: {str(e)}") |
|
return { |
|
"team1": team1, |
|
"team2": team2, |
|
"win_probability_team1": 50.0, |
|
"expected_team1_score": 0.0, |
|
"expected_team2_score": 0.0 |
|
} |