|
from flask_sqlalchemy import SQLAlchemy |
|
from flask_login import UserMixin |
|
from datetime import datetime |
|
import math |
|
from sqlalchemy import func |
|
|
|
db = SQLAlchemy() |
|
|
|
|
|
class User(db.Model, UserMixin): |
|
id = db.Column(db.Integer, primary_key=True) |
|
username = db.Column(db.String(100), unique=True, nullable=False) |
|
hf_id = db.Column(db.String(100), unique=True, nullable=False) |
|
join_date = db.Column(db.DateTime, default=datetime.utcnow) |
|
votes = db.relationship("Vote", backref="user", lazy=True) |
|
show_in_leaderboard = db.Column(db.Boolean, default=True) |
|
|
|
def __repr__(self): |
|
return f"<User {self.username}>" |
|
|
|
|
|
class ModelType: |
|
TTS = "tts" |
|
CONVERSATIONAL = "conversational" |
|
|
|
|
|
class Model(db.Model): |
|
id = db.Column(db.String(100), primary_key=True) |
|
name = db.Column(db.String(100), nullable=False) |
|
model_type = db.Column(db.String(20), nullable=False) |
|
|
|
votes = db.relationship( |
|
"Vote", |
|
primaryjoin="or_(Model.id==Vote.model_chosen, Model.id==Vote.model_rejected)", |
|
viewonly=True, |
|
) |
|
current_elo = db.Column(db.Float, default=1500.0) |
|
win_count = db.Column(db.Integer, default=0) |
|
match_count = db.Column(db.Integer, default=0) |
|
is_open = db.Column(db.Boolean, default=False) |
|
is_active = db.Column( |
|
db.Boolean, default=True |
|
) |
|
model_url = db.Column(db.String(255), nullable=True) |
|
|
|
@property |
|
def win_rate(self): |
|
if self.match_count == 0: |
|
return 0 |
|
return (self.win_count / self.match_count) * 100 |
|
|
|
def __repr__(self): |
|
return f"<Model {self.name} ({self.model_type})>" |
|
|
|
|
|
class Vote(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) |
|
text = db.Column(db.String(1000), nullable=False) |
|
vote_date = db.Column(db.DateTime, default=datetime.utcnow) |
|
model_chosen = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) |
|
model_rejected = db.Column( |
|
db.String(100), db.ForeignKey("model.id"), nullable=False |
|
) |
|
model_type = db.Column(db.String(20), nullable=False) |
|
|
|
chosen = db.relationship( |
|
"Model", |
|
foreign_keys=[model_chosen], |
|
backref=db.backref("chosen_votes", lazy=True), |
|
) |
|
rejected = db.relationship( |
|
"Model", |
|
foreign_keys=[model_rejected], |
|
backref=db.backref("rejected_votes", lazy=True), |
|
) |
|
|
|
def __repr__(self): |
|
return f"<Vote {self.id}: {self.model_chosen} over {self.model_rejected} ({self.model_type})>" |
|
|
|
|
|
class EloHistory(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) |
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow) |
|
elo_score = db.Column(db.Float, nullable=False) |
|
vote_id = db.Column(db.Integer, db.ForeignKey("vote.id"), nullable=True) |
|
model_type = db.Column(db.String(20), nullable=False) |
|
|
|
model = db.relationship("Model", backref=db.backref("elo_history", lazy=True)) |
|
vote = db.relationship("Vote", backref=db.backref("elo_changes", lazy=True)) |
|
|
|
def __repr__(self): |
|
return f"<EloHistory {self.model_id}: {self.elo_score} at {self.timestamp} ({self.model_type})>" |
|
|
|
|
|
def calculate_elo_change(winner_elo, loser_elo, k_factor=32): |
|
"""Calculate Elo rating changes for a match.""" |
|
expected_winner = 1 / (1 + math.pow(10, (loser_elo - winner_elo) / 400)) |
|
expected_loser = 1 / (1 + math.pow(10, (winner_elo - loser_elo) / 400)) |
|
|
|
winner_new_elo = winner_elo + k_factor * (1 - expected_winner) |
|
loser_new_elo = loser_elo + k_factor * (0 - expected_loser) |
|
|
|
return winner_new_elo, loser_new_elo |
|
|
|
|
|
def record_vote(user_id, text, chosen_model_id, rejected_model_id, model_type): |
|
"""Record a vote and update Elo ratings.""" |
|
|
|
vote = Vote( |
|
user_id=user_id, |
|
text=text, |
|
model_chosen=chosen_model_id, |
|
model_rejected=rejected_model_id, |
|
model_type=model_type, |
|
) |
|
db.session.add(vote) |
|
db.session.flush() |
|
|
|
|
|
chosen_model = Model.query.filter_by( |
|
id=chosen_model_id, model_type=model_type |
|
).first() |
|
rejected_model = Model.query.filter_by( |
|
id=rejected_model_id, model_type=model_type |
|
).first() |
|
|
|
if not chosen_model or not rejected_model: |
|
db.session.rollback() |
|
return None, "One or both models not found for the specified model type" |
|
|
|
|
|
new_chosen_elo, new_rejected_elo = calculate_elo_change( |
|
chosen_model.current_elo, rejected_model.current_elo |
|
) |
|
|
|
|
|
chosen_model.current_elo = new_chosen_elo |
|
chosen_model.win_count += 1 |
|
chosen_model.match_count += 1 |
|
|
|
rejected_model.current_elo = new_rejected_elo |
|
rejected_model.match_count += 1 |
|
|
|
|
|
chosen_history = EloHistory( |
|
model_id=chosen_model_id, |
|
elo_score=new_chosen_elo, |
|
vote_id=vote.id, |
|
model_type=model_type, |
|
) |
|
|
|
rejected_history = EloHistory( |
|
model_id=rejected_model_id, |
|
elo_score=new_rejected_elo, |
|
vote_id=vote.id, |
|
model_type=model_type, |
|
) |
|
|
|
db.session.add_all([chosen_history, rejected_history]) |
|
db.session.commit() |
|
|
|
return vote, None |
|
|
|
|
|
def get_leaderboard_data(model_type): |
|
""" |
|
Get leaderboard data for the specified model type. |
|
|
|
Args: |
|
model_type (str): The model type ('tts' or 'conversational') |
|
|
|
Returns: |
|
list: List of dictionaries containing model data for the leaderboard |
|
""" |
|
query = Model.query.filter_by(model_type=model_type) |
|
|
|
|
|
models = query.order_by(Model.current_elo.desc()).all() |
|
|
|
result = [] |
|
for rank, model in enumerate(models, 1): |
|
|
|
if rank <= 2: |
|
tier = "tier-s" |
|
elif rank <= 4: |
|
tier = "tier-a" |
|
elif rank <= 7: |
|
tier = "tier-b" |
|
else: |
|
tier = "" |
|
|
|
result.append( |
|
{ |
|
"rank": rank, |
|
"id": model.id, |
|
"name": model.name, |
|
"model_url": model.model_url, |
|
"win_rate": f"{model.win_rate:.0f}%", |
|
"total_votes": model.match_count, |
|
"elo": int(model.current_elo), |
|
"tier": tier, |
|
"is_open": model.is_open, |
|
} |
|
) |
|
|
|
return result |
|
|
|
|
|
def get_user_leaderboard(user_id, model_type): |
|
""" |
|
Get personalized leaderboard data for a specific user. |
|
|
|
Args: |
|
user_id (int): The user ID |
|
model_type (str): The model type ('tts' or 'conversational') |
|
|
|
Returns: |
|
list: List of dictionaries containing model data for the user's personal leaderboard |
|
""" |
|
|
|
models = Model.query.filter_by(model_type=model_type).all() |
|
|
|
|
|
user_votes = Vote.query.filter_by(user_id=user_id, model_type=model_type).all() |
|
|
|
|
|
model_stats = {model.id: {"wins": 0, "matches": 0} for model in models} |
|
|
|
for vote in user_votes: |
|
model_stats[vote.model_chosen]["wins"] += 1 |
|
model_stats[vote.model_chosen]["matches"] += 1 |
|
model_stats[vote.model_rejected]["matches"] += 1 |
|
|
|
|
|
result = [] |
|
for model in models: |
|
stats = model_stats[model.id] |
|
win_rate = ( |
|
(stats["wins"] / stats["matches"] * 100) if stats["matches"] > 0 else 0 |
|
) |
|
|
|
|
|
if stats["matches"] > 0: |
|
result.append( |
|
{ |
|
"id": model.id, |
|
"name": model.name, |
|
"model_url": model.model_url, |
|
"win_rate": f"{win_rate:.0f}%", |
|
"total_votes": stats["matches"], |
|
"wins": stats["wins"], |
|
"is_open": model.is_open, |
|
} |
|
) |
|
|
|
|
|
result.sort(key=lambda x: float(x["win_rate"].rstrip("%")), reverse=True) |
|
|
|
|
|
for i, item in enumerate(result, 1): |
|
item["rank"] = i |
|
|
|
return result |
|
|
|
|
|
def get_historical_leaderboard_data(model_type, target_date=None): |
|
""" |
|
Get leaderboard data at a specific date in history. |
|
|
|
Args: |
|
model_type (str): The model type ('tts' or 'conversational') |
|
target_date (datetime): The target date for historical data, defaults to current time |
|
|
|
Returns: |
|
list: List of dictionaries containing model data for the historical leaderboard |
|
""" |
|
if not target_date: |
|
target_date = datetime.utcnow() |
|
|
|
|
|
models = Model.query.filter_by(model_type=model_type).all() |
|
|
|
|
|
result = [] |
|
|
|
for model in models: |
|
|
|
elo_entry = ( |
|
EloHistory.query.filter( |
|
EloHistory.model_id == model.id, |
|
EloHistory.model_type == model_type, |
|
EloHistory.timestamp <= target_date, |
|
) |
|
.order_by(EloHistory.timestamp.desc()) |
|
.first() |
|
) |
|
|
|
|
|
if not elo_entry: |
|
continue |
|
|
|
|
|
match_count = Vote.query.filter( |
|
db.or_(Vote.model_chosen == model.id, Vote.model_rejected == model.id), |
|
Vote.model_type == model_type, |
|
Vote.vote_date <= target_date, |
|
).count() |
|
|
|
win_count = Vote.query.filter( |
|
Vote.model_chosen == model.id, |
|
Vote.model_type == model_type, |
|
Vote.vote_date <= target_date, |
|
).count() |
|
|
|
|
|
win_rate = (win_count / match_count * 100) if match_count > 0 else 0 |
|
|
|
|
|
result.append( |
|
{ |
|
"id": model.id, |
|
"name": model.name, |
|
"model_url": model.model_url, |
|
"win_rate": f"{win_rate:.0f}%", |
|
"total_votes": match_count, |
|
"elo": int(elo_entry.elo_score), |
|
"is_open": model.is_open, |
|
} |
|
) |
|
|
|
|
|
result.sort(key=lambda x: x["elo"], reverse=True) |
|
|
|
|
|
for i, item in enumerate(result, 1): |
|
item["rank"] = i |
|
|
|
if i <= 2: |
|
item["tier"] = "tier-s" |
|
elif i <= 4: |
|
item["tier"] = "tier-a" |
|
elif i <= 7: |
|
item["tier"] = "tier-b" |
|
else: |
|
item["tier"] = "" |
|
|
|
return result |
|
|
|
|
|
def get_key_historical_dates(model_type): |
|
""" |
|
Get a list of key dates in the leaderboard history. |
|
|
|
Args: |
|
model_type (str): The model type ('tts' or 'conversational') |
|
|
|
Returns: |
|
list: List of datetime objects representing key dates |
|
""" |
|
|
|
first_vote = ( |
|
Vote.query.filter_by(model_type=model_type) |
|
.order_by(Vote.vote_date.asc()) |
|
.first() |
|
) |
|
last_vote = ( |
|
Vote.query.filter_by(model_type=model_type) |
|
.order_by(Vote.vote_date.desc()) |
|
.first() |
|
) |
|
|
|
if not first_vote or not last_vote: |
|
return [] |
|
|
|
|
|
dates = [] |
|
current_date = first_vote.vote_date.replace(day=1) |
|
end_date = last_vote.vote_date |
|
|
|
while current_date <= end_date: |
|
dates.append(current_date) |
|
|
|
if current_date.month == 12: |
|
current_date = current_date.replace(year=current_date.year + 1, month=1) |
|
else: |
|
current_date = current_date.replace(month=current_date.month + 1) |
|
|
|
|
|
if dates and dates[-1].month != end_date.month or dates[-1].year != end_date.year: |
|
dates.append(end_date) |
|
|
|
return dates |
|
|
|
|
|
def insert_initial_models(): |
|
"""Insert initial models into the database.""" |
|
tts_models = [ |
|
Model( |
|
id="index-tts", |
|
name="Index TTS", |
|
model_type=ModelType.TTS, |
|
is_open=True, |
|
model_url="https://github.com/Index-Research/index-tts", |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Model( |
|
id="spark-tts", |
|
name="Spark TTS", |
|
model_type=ModelType.TTS, |
|
is_open=False, |
|
is_active=False, |
|
model_url="https://github.com/SparkAudio/Spark-TTS", |
|
), |
|
Model( |
|
id="maskgct", |
|
name="maskgct", |
|
model_type=ModelType.TTS, |
|
is_open=False, |
|
is_active=True, |
|
model_url="https://github.com/open-mmlab/Amphion/tree/main/models/tts/maskgct", |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Model( |
|
id="cosyvoice-2.0", |
|
name="CosyVoice 2.0", |
|
model_type=ModelType.TTS, |
|
is_open=True, |
|
model_url="https://github.com/FunAudioLLM/CosyVoice", |
|
), |
|
Model( |
|
id="gpt-sovits-v2", |
|
name="GPT-SoVITS v2", |
|
model_type=ModelType.TTS, |
|
is_open=True, |
|
model_url="https://huggingface.co/spaces/lj1995/GPT-SoVITS-v2", |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
] |
|
conversational_models = [ |
|
Model( |
|
id="csm-1b", |
|
name="CSM 1B", |
|
model_type=ModelType.CONVERSATIONAL, |
|
is_open=True, |
|
model_url="https://huggingface.co/sesame/csm-1b", |
|
), |
|
Model( |
|
id="playdialog-1.0", |
|
name="PlayDialog 1.0", |
|
model_type=ModelType.CONVERSATIONAL, |
|
is_open=False, |
|
model_url="https://play.ht/", |
|
), |
|
Model( |
|
id="dia-1.6b", |
|
name="Dia 1.6B", |
|
model_type=ModelType.CONVERSATIONAL, |
|
is_open=True, |
|
model_url="https://huggingface.co/nari-labs/Dia-1.6B", |
|
), |
|
] |
|
|
|
all_models = tts_models + conversational_models |
|
|
|
for model in all_models: |
|
existing = Model.query.filter_by( |
|
id=model.id, model_type=model.model_type |
|
).first() |
|
if not existing: |
|
db.session.add(model) |
|
else: |
|
|
|
existing.name = model.name |
|
existing.is_open = model.is_open |
|
if model.is_active is not None: |
|
existing.is_active = model.is_active |
|
|
|
db.session.commit() |
|
|
|
|
|
def get_top_voters(limit=10): |
|
""" |
|
Get the top voters by number of votes. |
|
|
|
Args: |
|
limit (int): Number of users to return |
|
|
|
Returns: |
|
list: List of dictionaries containing user data and vote counts |
|
""" |
|
|
|
top_users = db.session.query( |
|
User, func.count(Vote.id).label('vote_count') |
|
).join(Vote).filter( |
|
User.show_in_leaderboard == True |
|
).group_by(User.id).order_by( |
|
func.count(Vote.id).desc() |
|
).limit(limit).all() |
|
|
|
result = [] |
|
for i, (user, vote_count) in enumerate(top_users, 1): |
|
result.append({ |
|
"rank": i, |
|
"username": user.username, |
|
"vote_count": vote_count, |
|
"join_date": user.join_date.strftime("%b %d, %Y") |
|
}) |
|
|
|
return result |
|
|
|
|
|
def toggle_user_leaderboard_visibility(user_id): |
|
""" |
|
Toggle whether a user appears in the voters leaderboard |
|
|
|
Args: |
|
user_id (int): The user ID |
|
|
|
Returns: |
|
bool: New visibility state |
|
""" |
|
user = User.query.get(user_id) |
|
if not user: |
|
return None |
|
|
|
user.show_in_leaderboard = not user.show_in_leaderboard |
|
db.session.commit() |
|
|
|
return user.show_in_leaderboard |
|
|