hamverbot's picture
Upload src/algorithms/dual_ogd.py
03e57c7 verified
"""
DualOGD Bidding Algorithm
Based on: Wang et al. "Learning to Bid in Repeated First-Price Auctions with Budgets" (2023)
arXiv: 2304.13477, Algorithm 1
The canonical Lagrangian dual multiplier approach with online gradient descent.
Core update:
λ_{t+1} = Proj_{λ>0}(λ_t − ε · (ρ − c̃_t(b_t)))
Bid rule:
b_t = argmax_b (r̃_t(v_t, b) − λ_t · c̃_t(b))
Where:
v_t = value of winning = pCTR × value_per_click
r̃_t(v,b) = (v-b) · G̃_t(b) — empirical expected reward
c̃_t(b) = b · G̃_t(b) — empirical expected cost
G̃_t(b) = empirical win probability from historical competing bids
ρ = B/T = target spend per auction
The dual multiplier λ acts as a pace multiplier:
- If you overspend → λ increases → future bids are penalized more → spend decreases
- If you underspend → λ decreases → future bids are cheaper → spend increases
"""
import numpy as np
class DualOGDBidder:
"""
Dual OGD bidder for first-price auctions with budget constraint.
Full information feedback: observes all maximum competing bids d_t.
"""
def __init__(
self,
budget,
T,
value_per_click,
epsilon=None,
empirical_cdf=None,
name="DualOGD"
):
"""
Args:
budget: Total budget B
T: Time horizon (number of auctions)
value_per_click: Value of each click in currency units
epsilon: Step size for dual update. Default: 1/sqrt(T)
empirical_cdf: EmpiricalCDF instance for win prob estimation
name: Algorithm name for logging
"""
self.B = budget
self.T = T
self.rho = budget / T # Target spend per auction
self.vpc = value_per_click
self.name = name
# Dual multiplier λ
self.lambd = 0.0
# Step size
self.epsilon = epsilon if epsilon is not None else 1.0 / np.sqrt(T)
# Spend tracking
self.total_spent = 0.0
self.remaining_budget = budget
self.t = 0
self.total_wins = 0
self.total_clicks = 0
# History for empirical estimation
self.competing_bids = [] # All observed d_t values
def bid(self, pctr, features=None):
"""
Compute bid for current auction.
Args:
pctr: Predicted click probability pCTR ∈ [0,1]
features: Optional feature vector (unused in non-contextual version)
Returns:
bid_price: Optimal bid in [0, remaining_budget]
"""
self.t += 1
# Check if budget exhausted
if self.remaining_budget <= 0:
return 0.0
v = pctr * self.vpc # Value of winning this impression
# Maximum possible bid: don't bid more than value or remaining budget
max_bid = min(v * 2.0, self.remaining_budget)
if max_bid <= 0.1:
return 0.0
# Find b_t = argmax_b (r̃_t(v,b) - λ · c̃_t(b))
bid = self._find_optimal_bid(v, max_bid)
return bid
def _find_optimal_bid(self, v, max_bid, n_candidates=50):
"""Grid search for optimal bid."""
if len(self.competing_bids) == 0:
# No history: bid half of value as exploration
return v * 0.5
candidates = np.linspace(0.1, max_bid, n_candidates)
best_score = -float('inf')
best_bid = candidates[0]
for b in candidates:
win_prob = self._empirical_win_prob(b)
reward = (v - b) * win_prob
cost = b * win_prob
score = reward - self.lambd * cost
if score > best_score:
best_score = score
best_bid = b
return float(best_bid)
def _empirical_win_prob(self, b):
"""G̃_t(b) = fraction of historical competing bids ≤ b."""
if not self.competing_bids:
return 0.5
return np.mean([1.0 if b >= d else 0.0 for d in self.competing_bids])
def _empirical_expected_cost(self, b):
"""c̃_t(b) = b · G̃_t(b)."""
return b * self._empirical_win_prob(b)
def update(self, won, cost, pctr, d_t=None):
"""
Update state after observing auction outcome.
Args:
won: bool, whether bid won
cost: actual cost incurred (bid price in first-price)
pctr: pCTR used (for logging)
d_t: maximum competing bid (observed under full feedback)
"""
if won:
self.total_spent += cost
self.remaining_budget -= cost
self.total_wins += 1
# Record competing bid for empirical estimation
if d_t is not None:
self.competing_bids.append(d_t)
# Dual multiplier update: λ_{t+1} = max(0, λ_t - ε·(ρ - c̃_t(b_t)))
# Use actual cost as feedback: gradient = ρ - cost
cost_feedback = cost if won else 0.0
gradient = self.rho - cost_feedback
self.lambd = max(0.0, self.lambd - self.epsilon * gradient)
def get_stats(self):
"""Get current algorithm statistics."""
return {
'name': self.name,
'lambda': float(self.lambd),
'spent': float(self.total_spent),
'remaining': float(self.remaining_budget),
'budget_used': float(self.total_spent / self.B) if self.B > 0 else 0,
'wins': self.total_wins,
't': self.t,
'epsilon': float(self.epsilon),
'rho': float(self.rho),
}
class TwoSidedDualBidder(DualOGDBidder):
"""
Two-sided dual multiplier bidder: budget cap + spend floor.
Adds a second dual variable ν to enforce minimum spend (k%):
μ: cap penalty — restrains when ahead on spend
ν: floor incentive — encourages when behind on spend
Updates:
μ_{t+1} = Proj(μ_t - η₁·(ρ - c̃_t(b_t))) # cap
ν_{t+1} = Proj(ν_t - η₂·(c̃_t(b_t) - kρ)) # floor
Bid rule:
b_t = argmax_b (r̃_t(v,b) - (μ_t - ν_t)·c̃_t(b))
When μ > ν: cap dominates → bid conservatively
When ν > μ: floor dominates → bid aggressively
"""
def __init__(
self,
budget,
T,
value_per_click,
k=0.8, # Minimum spend fraction
epsilon_cap=None,
epsilon_floor=None,
empirical_cdf=None,
name="TwoSidedDual"
):
super().__init__(budget, T, value_per_click, epsilon_cap, empirical_cdf, name)
self.k = k # Minimum spend fraction
self.k_rho = k * self.rho # Target minimum spend per auction
# Floor dual multiplier ν
self.nu = 0.0
# Floor step size
self.epsilon_floor = epsilon_floor if epsilon_floor is not None else 1.0 / np.sqrt(T)
# Rename for clarity
self.mu = self.lambd # Cap multiplier
self.epsilon_cap = self.epsilon
def _find_optimal_bid(self, v, max_bid, n_candidates=50):
"""Bid with combined cap+floor penalty: (μ - ν) multiplier."""
if len(self.competing_bids) == 0:
return v * 0.5
candidates = np.linspace(0.1, max_bid, n_candidates)
best_score = -float('inf')
best_bid = candidates[0]
effective_multiplier = self.mu - self.nu
for b in candidates:
win_prob = self._empirical_win_prob(b)
reward = (v - b) * win_prob
cost = b * win_prob
score = reward - effective_multiplier * cost
if score > best_score:
best_score = score
best_bid = b
return float(best_bid)
def update(self, won, cost, pctr, d_t=None):
"""Update both dual variables."""
if won:
self.total_spent += cost
self.remaining_budget -= cost
self.total_wins += 1
if d_t is not None:
self.competing_bids.append(d_t)
cost_feedback = cost if won else 0.0
# Cap update: μ_{t+1} = max(0, μ_t - η₁·(ρ - cost))
cap_gradient = self.rho - cost_feedback
self.mu = max(0.0, self.mu - self.epsilon_cap * cap_gradient)
# Floor update: ν_{t+1} = max(0, ν_t - η₂·(cost - kρ))
floor_gradient = cost_feedback - self.k_rho
self.nu = max(0.0, self.nu - self.epsilon_floor * floor_gradient)
# Keep lambd in sync for stats
self.lambd = self.mu
def get_stats(self):
stats = super().get_stats()
stats.update({
'mu': float(self.mu),
'nu': float(self.nu),
'effective_multiplier': float(self.mu - self.nu),
'k': float(self.k),
'k_rho': float(self.k_rho),
})
return stats