zhiminy commited on
Commit
74e2c25
Β·
1 Parent(s): 8f68ab4

hybrid token pool

Browse files
Files changed (2) hide show
  1. app.py +273 -17
  2. msr.py +295 -31
app.py CHANGED
@@ -125,11 +125,15 @@ def normalize_date_format(date_string):
125
  # GITHUB API OPERATIONS
126
  # =============================================================================
127
 
128
- def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30):
129
  """
130
  Perform an HTTP request with exponential backoff and jitter for GitHub API.
131
  Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
132
 
 
 
 
 
133
  Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
134
  """
135
  delay = 1.0
@@ -154,6 +158,7 @@ def request_with_backoff(method, url, *, headers=None, params=None, json_body=No
154
  # Rate limits or server errors -> retry with backoff
155
  if status in (403, 429) or 500 <= status < 600:
156
  wait = None
 
157
 
158
  # Prefer Retry-After when present
159
  retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
@@ -168,11 +173,15 @@ def request_with_backoff(method, url, *, headers=None, params=None, json_body=No
168
  reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
169
  if reset_hdr:
170
  try:
171
- reset_ts = int(float(reset_hdr))
172
- wait = max(reset_ts - time.time() + 2, 1)
173
  except Exception:
174
  wait = None
175
 
 
 
 
 
176
  # Final fallback: exponential backoff with jitter
177
  if wait is None:
178
  wait = delay + random.uniform(0, 0.5)
@@ -221,26 +230,179 @@ def get_github_token():
221
 
222
  class TokenPool:
223
  """
224
- Manages a pool of GitHub tokens for load balancing across rate limits.
225
- Rotates through tokens in round-robin fashion to distribute API calls.
 
 
 
 
 
 
 
 
 
226
  """
227
  def __init__(self, tokens):
228
- self.tokens = tokens if tokens else [None]
229
- self.current_index = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
231
- def get_next_token(self):
232
- """Get the next token in round-robin order."""
233
- if not self.tokens:
234
  return None
235
- token = self.tokens[self.current_index]
236
- self.current_index = (self.current_index + 1) % len(self.tokens)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
237
  return token
238
 
239
  def get_headers(self):
240
- """Get headers with the next token in rotation."""
241
  token = self.get_next_token()
242
  return {'Authorization': f'token {token}'} if token else {}
243
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
 
245
  def validate_github_username(identifier):
246
  """Verify that a GitHub identifier exists with backoff-aware requests."""
@@ -318,10 +480,11 @@ def fetch_reviews_with_time_partition(base_query, start_date, end_date, token_po
318
  'sort': 'created',
319
  'order': 'asc'
320
  }
321
- headers = token_pool.get_headers()
 
322
 
323
  try:
324
- response = request_with_backoff('GET', url, headers=headers, params=params)
325
  if response is None:
326
  print(f"{indent} Error: retries exhausted for range {start_str} to {end_str}")
327
  return total_in_partition
@@ -479,6 +642,98 @@ def fetch_reviews_with_time_partition(base_query, start_date, end_date, token_po
479
  return total_in_partition
480
 
481
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
482
  def extract_review_metadata(pr):
483
  """
484
  Extract minimal PR review metadata for efficient storage.
@@ -576,8 +831,9 @@ def update_pr_status(metadata_list, token_pool):
576
  owner, repo, pull_word, pr_number = parts[0], parts[1], parts[2], parts[3]
577
  api_url = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}'
578
 
579
- headers = token_pool.get_headers()
580
- response = request_with_backoff('GET', api_url, headers=headers, max_retries=3)
 
581
 
582
  if response and response.status_code == 200:
583
  pr_data = response.json()
 
125
  # GITHUB API OPERATIONS
126
  # =============================================================================
127
 
128
+ def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30, token_pool=None, token=None):
129
  """
130
  Perform an HTTP request with exponential backoff and jitter for GitHub API.
131
  Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
132
 
133
+ Args:
134
+ token_pool: Optional TokenPool instance for rate limit tracking
135
+ token: Optional token string to mark as rate-limited if 403/429 occurs
136
+
137
  Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
138
  """
139
  delay = 1.0
 
158
  # Rate limits or server errors -> retry with backoff
159
  if status in (403, 429) or 500 <= status < 600:
160
  wait = None
161
+ reset_timestamp = None
162
 
163
  # Prefer Retry-After when present
164
  retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
 
173
  reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
174
  if reset_hdr:
175
  try:
176
+ reset_timestamp = int(float(reset_hdr))
177
+ wait = max(reset_timestamp - time.time() + 2, 1)
178
  except Exception:
179
  wait = None
180
 
181
+ # Mark token as rate-limited if we have token pool and token
182
+ if status in (403, 429) and token_pool and token:
183
+ token_pool.mark_rate_limited(token, reset_timestamp)
184
+
185
  # Final fallback: exponential backoff with jitter
186
  if wait is None:
187
  wait = delay + random.uniform(0, 0.5)
 
230
 
231
  class TokenPool:
232
  """
233
+ Hybrid token pool with parallel execution and round-robin fallback.
234
+
235
+ Splits tokens into two pools:
236
+ - Parallel pool (50%): For concurrent API calls to maximize throughput
237
+ - Round-robin pool (50%): Backup pool for rate limit fallback
238
+
239
+ Features:
240
+ - Automatic fallback when parallel tokens hit rate limits
241
+ - Rate limit tracking with timestamp-based recovery
242
+ - Thread-safe token management
243
+ - Real-time statistics monitoring
244
  """
245
  def __init__(self, tokens):
246
+ import threading
247
+
248
+ self.all_tokens = tokens if tokens else [None]
249
+ self.lock = threading.Lock()
250
+
251
+ # Split tokens into parallel and round-robin pools (50/50)
252
+ total_tokens = len(self.all_tokens)
253
+ split_point = max(1, total_tokens // 2)
254
+
255
+ self.parallel_tokens = self.all_tokens[:split_point]
256
+ self.roundrobin_tokens = self.all_tokens[split_point:] if total_tokens > 1 else self.all_tokens
257
+
258
+ # Round-robin index for fallback pool
259
+ self.roundrobin_index = 0
260
+
261
+ # Rate limit tracking: {token: reset_timestamp}
262
+ self.parallel_rate_limited = set()
263
+ self.roundrobin_rate_limited = set()
264
+ self.rate_limit_resets = {}
265
+
266
+ # Statistics
267
+ self.stats = {
268
+ 'parallel_calls': 0,
269
+ 'roundrobin_calls': 0,
270
+ 'fallback_triggers': 0
271
+ }
272
+
273
+ print(f"πŸ“Š Token Pool Initialized:")
274
+ print(f" Total tokens: {total_tokens}")
275
+ print(f" Parallel pool: {len(self.parallel_tokens)} tokens")
276
+ print(f" Round-robin pool: {len(self.roundrobin_tokens)} tokens")
277
+
278
+ def _cleanup_expired_rate_limits(self):
279
+ """Remove tokens from rate-limited sets if their reset time has passed."""
280
+ current_time = time.time()
281
+ expired_tokens = [
282
+ token for token, reset_time in self.rate_limit_resets.items()
283
+ if current_time >= reset_time
284
+ ]
285
+
286
+ for token in expired_tokens:
287
+ self.parallel_rate_limited.discard(token)
288
+ self.roundrobin_rate_limited.discard(token)
289
+ del self.rate_limit_resets[token]
290
+ if expired_tokens:
291
+ print(f" βœ“ Recovered {len(expired_tokens)} token(s) from rate limit")
292
+
293
+ def get_parallel_token(self):
294
+ """Get an available token from the parallel pool."""
295
+ with self.lock:
296
+ self._cleanup_expired_rate_limits()
297
+
298
+ # Find first non-rate-limited parallel token
299
+ for token in self.parallel_tokens:
300
+ if token not in self.parallel_rate_limited:
301
+ self.stats['parallel_calls'] += 1
302
+ return token
303
+
304
+ return None
305
+
306
+ def get_roundrobin_token(self):
307
+ """Get the next available token from round-robin pool."""
308
+ with self.lock:
309
+ self._cleanup_expired_rate_limits()
310
+
311
+ # Try all tokens in round-robin order
312
+ attempts = 0
313
+ while attempts < len(self.roundrobin_tokens):
314
+ token = self.roundrobin_tokens[self.roundrobin_index]
315
+ self.roundrobin_index = (self.roundrobin_index + 1) % len(self.roundrobin_tokens)
316
+
317
+ if token not in self.roundrobin_rate_limited:
318
+ self.stats['roundrobin_calls'] += 1
319
+ return token
320
+
321
+ attempts += 1
322
 
 
 
 
323
  return None
324
+
325
+ def get_next_token(self):
326
+ """
327
+ Get next available token, trying parallel pool first, then falling back to round-robin.
328
+
329
+ Returns:
330
+ Token string or None if all tokens are rate-limited
331
+ """
332
+ # Try parallel pool first
333
+ token = self.get_parallel_token()
334
+ if token:
335
+ return token
336
+
337
+ # Fallback to round-robin pool
338
+ with self.lock:
339
+ self.stats['fallback_triggers'] += 1
340
+
341
+ token = self.get_roundrobin_token()
342
+ if not token:
343
+ print(" ⚠️ All tokens are rate-limited, waiting...")
344
+
345
  return token
346
 
347
  def get_headers(self):
348
+ """Get headers with the next available token."""
349
  token = self.get_next_token()
350
  return {'Authorization': f'token {token}'} if token else {}
351
 
352
+ def mark_rate_limited(self, token, reset_timestamp=None):
353
+ """
354
+ Mark a token as rate-limited with optional reset timestamp.
355
+
356
+ Args:
357
+ token: The token to mark as rate-limited
358
+ reset_timestamp: Unix timestamp when rate limit resets (optional)
359
+ """
360
+ if not token:
361
+ return
362
+
363
+ with self.lock:
364
+ # Determine which pool the token belongs to
365
+ if token in self.parallel_tokens:
366
+ self.parallel_rate_limited.add(token)
367
+ if token in self.roundrobin_tokens:
368
+ self.roundrobin_rate_limited.add(token)
369
+
370
+ # Store reset timestamp if provided
371
+ if reset_timestamp:
372
+ self.rate_limit_resets[token] = reset_timestamp
373
+ reset_time = datetime.fromtimestamp(reset_timestamp, tz=timezone.utc)
374
+ print(f" ⏰ Token rate-limited until {reset_time.strftime('%H:%M:%S')} UTC")
375
+
376
+ def get_available_parallel_tokens(self):
377
+ """Get list of all available (non-rate-limited) parallel tokens."""
378
+ with self.lock:
379
+ self._cleanup_expired_rate_limits()
380
+ return [t for t in self.parallel_tokens if t not in self.parallel_rate_limited]
381
+
382
+ def get_stats(self):
383
+ """Get token pool usage statistics."""
384
+ with self.lock:
385
+ return {
386
+ 'parallel_calls': self.stats['parallel_calls'],
387
+ 'roundrobin_calls': self.stats['roundrobin_calls'],
388
+ 'fallback_triggers': self.stats['fallback_triggers'],
389
+ 'parallel_rate_limited': len(self.parallel_rate_limited),
390
+ 'roundrobin_rate_limited': len(self.roundrobin_rate_limited)
391
+ }
392
+
393
+ def print_stats(self):
394
+ """Print token pool usage statistics."""
395
+ stats = self.get_stats()
396
+ total_calls = stats['parallel_calls'] + stats['roundrobin_calls']
397
+
398
+ print(f"\nπŸ“Š Token Pool Statistics:")
399
+ print(f" Total API calls: {total_calls}")
400
+ if total_calls > 0:
401
+ print(f" Parallel calls: {stats['parallel_calls']} ({stats['parallel_calls']/total_calls*100:.1f}%)")
402
+ print(f" Round-robin calls: {stats['roundrobin_calls']} ({stats['roundrobin_calls']/total_calls*100:.1f}%)")
403
+ print(f" Fallback triggers: {stats['fallback_triggers']}")
404
+ print(f" Currently rate-limited: {stats['parallel_rate_limited']} parallel, {stats['roundrobin_rate_limited']} round-robin")
405
+
406
 
407
  def validate_github_username(identifier):
408
  """Verify that a GitHub identifier exists with backoff-aware requests."""
 
480
  'sort': 'created',
481
  'order': 'asc'
482
  }
483
+ token = token_pool.get_next_token()
484
+ headers = {'Authorization': f'token {token}'} if token else {}
485
 
486
  try:
487
+ response = request_with_backoff('GET', url, headers=headers, params=params, token_pool=token_pool, token=token)
488
  if response is None:
489
  print(f"{indent} Error: retries exhausted for range {start_str} to {end_str}")
490
  return total_in_partition
 
642
  return total_in_partition
643
 
644
 
645
+ def fetch_reviews_parallel(query_patterns, start_date, end_date, token_pool, prs_by_url, debug_limit=None):
646
+ """
647
+ Fetch reviews for multiple query patterns in parallel using available parallel tokens.
648
+
649
+ This function uses ThreadPoolExecutor to execute multiple query patterns concurrently,
650
+ with each pattern using a dedicated token from the parallel pool. Falls back to
651
+ sequential execution if insufficient parallel tokens are available.
652
+
653
+ Args:
654
+ query_patterns: List of query pattern strings (e.g., ['is:pr author:bot1', 'is:pr reviewed-by:bot1'])
655
+ start_date: Start datetime for time range
656
+ end_date: End datetime for time range
657
+ token_pool: TokenPool instance for token management
658
+ prs_by_url: Dictionary to collect PRs by URL (shared across patterns)
659
+ debug_limit: Optional limit on total PRs to fetch (for testing)
660
+
661
+ Returns:
662
+ Total number of PRs found across all patterns
663
+ """
664
+ from concurrent.futures import ThreadPoolExecutor, as_completed
665
+ import threading
666
+
667
+ # Check how many parallel tokens are available
668
+ available_tokens = token_pool.get_available_parallel_tokens()
669
+
670
+ if len(available_tokens) < 2 or len(query_patterns) < 2:
671
+ # Not enough tokens or patterns for parallelization, use sequential
672
+ print(f" ⚠️ Sequential execution: {len(available_tokens)} parallel tokens available for {len(query_patterns)} patterns")
673
+ total_found = 0
674
+ for pattern in query_patterns:
675
+ pattern_prs = {}
676
+ count = fetch_reviews_with_time_partition(
677
+ pattern, start_date, end_date, token_pool, pattern_prs, debug_limit, depth=0
678
+ )
679
+ # Merge pattern results into global dict
680
+ with threading.Lock():
681
+ for url, pr in pattern_prs.items():
682
+ if url not in prs_by_url:
683
+ prs_by_url[url] = pr
684
+ total_found += count
685
+ return total_found
686
+
687
+ # Use parallel execution
688
+ print(f" πŸš€ Parallel execution: {len(available_tokens)} parallel tokens for {len(query_patterns)} patterns")
689
+
690
+ # Thread-safe lock for updating prs_by_url
691
+ lock = threading.Lock()
692
+
693
+ def fetch_pattern(pattern):
694
+ """Fetch reviews for a single pattern (runs in parallel)."""
695
+ pattern_prs = {}
696
+ try:
697
+ count = fetch_reviews_with_time_partition(
698
+ pattern, start_date, end_date, token_pool, pattern_prs, debug_limit, depth=0
699
+ )
700
+ return pattern, pattern_prs, count
701
+ except Exception as e:
702
+ print(f" Error fetching pattern '{pattern}': {str(e)}")
703
+ return pattern, {}, 0
704
+
705
+ # Execute patterns in parallel
706
+ max_workers = min(len(query_patterns), len(available_tokens))
707
+ total_found = 0
708
+
709
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
710
+ # Submit all patterns
711
+ future_to_pattern = {
712
+ executor.submit(fetch_pattern, pattern): pattern
713
+ for pattern in query_patterns
714
+ }
715
+
716
+ # Collect results as they complete
717
+ for future in as_completed(future_to_pattern):
718
+ pattern = future_to_pattern[future]
719
+ try:
720
+ _, pattern_prs, count = future.result()
721
+
722
+ # Merge results into global dict (thread-safe)
723
+ with lock:
724
+ for url, pr in pattern_prs.items():
725
+ if url not in prs_by_url:
726
+ prs_by_url[url] = pr
727
+
728
+ total_found += count
729
+ print(f" βœ“ Pattern '{pattern}' completed: {count} PRs found")
730
+
731
+ except Exception as e:
732
+ print(f" βœ— Pattern '{pattern}' failed: {str(e)}")
733
+
734
+ return total_found
735
+
736
+
737
  def extract_review_metadata(pr):
738
  """
739
  Extract minimal PR review metadata for efficient storage.
 
831
  owner, repo, pull_word, pr_number = parts[0], parts[1], parts[2], parts[3]
832
  api_url = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}'
833
 
834
+ token = token_pool.get_next_token()
835
+ headers = {'Authorization': f'token {token}'} if token else {}
836
+ response = request_with_backoff('GET', api_url, headers=headers, max_retries=3, token_pool=token_pool, token=token)
837
 
838
  if response and response.status_code == 200:
839
  pr_data = response.json()
msr.py CHANGED
@@ -69,26 +69,180 @@ def get_github_tokens():
69
 
70
  class TokenPool:
71
  """
72
- Manages a pool of GitHub tokens for load balancing across rate limits.
73
- Rotates through tokens in round-robin fashion to distribute API calls.
 
 
 
 
 
 
 
 
 
74
  """
75
  def __init__(self, tokens):
76
- self.tokens = tokens if tokens else [None]
77
- self.current_index = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
- def get_next_token(self):
80
- """Get the next token in round-robin order."""
81
- if not self.tokens:
82
  return None
83
- token = self.tokens[self.current_index]
84
- self.current_index = (self.current_index + 1) % len(self.tokens)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
  return token
86
 
87
  def get_headers(self):
88
- """Get headers with the next token in rotation."""
89
  token = self.get_next_token()
90
  return {'Authorization': f'token {token}'} if token else {}
91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
  def get_hf_token():
94
  """Get HuggingFace token from environment variables."""
@@ -102,10 +256,16 @@ def get_hf_token():
102
  # GITHUB API FUNCTIONS
103
  # =============================================================================
104
 
105
- def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30):
106
  """
107
  Perform an HTTP request with exponential backoff and jitter for GitHub API.
108
  Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
 
 
 
 
 
 
109
  """
110
  delay = 1.0
111
  for attempt in range(max_retries):
@@ -129,6 +289,7 @@ def request_with_backoff(method, url, *, headers=None, params=None, json_body=No
129
  # Rate limits or server errors -> retry with backoff
130
  if status in (403, 429) or 500 <= status < 600:
131
  wait = None
 
132
 
133
  # Prefer Retry-After when present
134
  retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
@@ -143,11 +304,15 @@ def request_with_backoff(method, url, *, headers=None, params=None, json_body=No
143
  reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
144
  if reset_hdr:
145
  try:
146
- reset_ts = int(float(reset_hdr))
147
- wait = max(reset_ts - time.time() + 2, 1)
148
  except Exception:
149
  wait = None
150
 
 
 
 
 
151
  # Final fallback: exponential backoff with jitter
152
  if wait is None:
153
  wait = delay + random.uniform(0, 0.5)
@@ -223,10 +388,11 @@ def fetch_reviews_with_time_partition(base_query, start_date, end_date, token_po
223
  'sort': 'created',
224
  'order': 'asc'
225
  }
226
- headers = token_pool.get_headers()
 
227
 
228
  try:
229
- response = request_with_backoff('GET', url, headers=headers, params=params)
230
  if response is None:
231
  print(f"{indent} Error: retries exhausted for range {start_str} to {end_str}")
232
  return total_in_partition
@@ -373,6 +539,98 @@ def fetch_reviews_with_time_partition(base_query, start_date, end_date, token_po
373
  return total_in_partition
374
 
375
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
376
  def extract_review_metadata(pr):
377
  """
378
  Extract minimal PR review metadata for efficient storage.
@@ -454,8 +712,9 @@ def update_pr_status(metadata_list, token_pool):
454
  owner, repo, pull_word, pr_number = parts[0], parts[1], parts[2], parts[3]
455
  api_url = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}'
456
 
457
- headers = token_pool.get_headers()
458
- response = request_with_backoff('GET', api_url, headers=headers, max_retries=3)
 
459
 
460
  if response and response.status_code == 200:
461
  pr_data = response.json()
@@ -529,29 +788,34 @@ def fetch_all_reviews_metadata(identifier, agent_name, token_pool):
529
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 12:00 AM UTC today
530
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
531
 
532
- for query_pattern in query_patterns:
533
- print(f"\nπŸ” Searching with query: {query_pattern}")
534
- print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} (today excluded)")
535
 
536
- pattern_start_time = time.time()
537
- initial_count = len(prs_by_url)
538
 
539
- # Fetch with time partitioning
 
 
 
 
 
 
 
 
 
 
540
  reviews_found = fetch_reviews_with_time_partition(
541
- query_pattern,
542
  start_date,
543
  end_date,
544
  token_pool,
545
  prs_by_url
546
  )
547
 
548
- pattern_duration = time.time() - pattern_start_time
549
- new_reviews = len(prs_by_url) - initial_count
550
-
551
- print(f" βœ“ Pattern complete: {new_reviews} new PRs found ({reviews_found} total fetched)")
552
- print(f" ⏱️ Time taken: {pattern_duration:.1f} seconds")
553
-
554
- time.sleep(1.0)
555
 
556
  all_prs = list(prs_by_url.values())
557
 
 
69
 
70
  class TokenPool:
71
  """
72
+ Hybrid token pool with parallel execution and round-robin fallback.
73
+
74
+ Splits tokens into two pools:
75
+ - Parallel pool (50%): For concurrent API calls to maximize throughput
76
+ - Round-robin pool (50%): Backup pool for rate limit fallback
77
+
78
+ Features:
79
+ - Automatic fallback when parallel tokens hit rate limits
80
+ - Rate limit tracking with timestamp-based recovery
81
+ - Thread-safe token management
82
+ - Real-time statistics monitoring
83
  """
84
  def __init__(self, tokens):
85
+ import threading
86
+
87
+ self.all_tokens = tokens if tokens else [None]
88
+ self.lock = threading.Lock()
89
+
90
+ # Split tokens into parallel and round-robin pools (50/50)
91
+ total_tokens = len(self.all_tokens)
92
+ split_point = max(1, total_tokens // 2)
93
+
94
+ self.parallel_tokens = self.all_tokens[:split_point]
95
+ self.roundrobin_tokens = self.all_tokens[split_point:] if total_tokens > 1 else self.all_tokens
96
+
97
+ # Round-robin index for fallback pool
98
+ self.roundrobin_index = 0
99
+
100
+ # Rate limit tracking: {token: reset_timestamp}
101
+ self.parallel_rate_limited = set()
102
+ self.roundrobin_rate_limited = set()
103
+ self.rate_limit_resets = {}
104
+
105
+ # Statistics
106
+ self.stats = {
107
+ 'parallel_calls': 0,
108
+ 'roundrobin_calls': 0,
109
+ 'fallback_triggers': 0
110
+ }
111
+
112
+ print(f"πŸ“Š Token Pool Initialized:")
113
+ print(f" Total tokens: {total_tokens}")
114
+ print(f" Parallel pool: {len(self.parallel_tokens)} tokens")
115
+ print(f" Round-robin pool: {len(self.roundrobin_tokens)} tokens")
116
+
117
+ def _cleanup_expired_rate_limits(self):
118
+ """Remove tokens from rate-limited sets if their reset time has passed."""
119
+ current_time = time.time()
120
+ expired_tokens = [
121
+ token for token, reset_time in self.rate_limit_resets.items()
122
+ if current_time >= reset_time
123
+ ]
124
+
125
+ for token in expired_tokens:
126
+ self.parallel_rate_limited.discard(token)
127
+ self.roundrobin_rate_limited.discard(token)
128
+ del self.rate_limit_resets[token]
129
+ if expired_tokens:
130
+ print(f" βœ“ Recovered {len(expired_tokens)} token(s) from rate limit")
131
+
132
+ def get_parallel_token(self):
133
+ """Get an available token from the parallel pool."""
134
+ with self.lock:
135
+ self._cleanup_expired_rate_limits()
136
+
137
+ # Find first non-rate-limited parallel token
138
+ for token in self.parallel_tokens:
139
+ if token not in self.parallel_rate_limited:
140
+ self.stats['parallel_calls'] += 1
141
+ return token
142
 
 
 
 
143
  return None
144
+
145
+ def get_roundrobin_token(self):
146
+ """Get the next available token from round-robin pool."""
147
+ with self.lock:
148
+ self._cleanup_expired_rate_limits()
149
+
150
+ # Try all tokens in round-robin order
151
+ attempts = 0
152
+ while attempts < len(self.roundrobin_tokens):
153
+ token = self.roundrobin_tokens[self.roundrobin_index]
154
+ self.roundrobin_index = (self.roundrobin_index + 1) % len(self.roundrobin_tokens)
155
+
156
+ if token not in self.roundrobin_rate_limited:
157
+ self.stats['roundrobin_calls'] += 1
158
+ return token
159
+
160
+ attempts += 1
161
+
162
+ return None
163
+
164
+ def get_next_token(self):
165
+ """
166
+ Get next available token, trying parallel pool first, then falling back to round-robin.
167
+
168
+ Returns:
169
+ Token string or None if all tokens are rate-limited
170
+ """
171
+ # Try parallel pool first
172
+ token = self.get_parallel_token()
173
+ if token:
174
+ return token
175
+
176
+ # Fallback to round-robin pool
177
+ with self.lock:
178
+ self.stats['fallback_triggers'] += 1
179
+
180
+ token = self.get_roundrobin_token()
181
+ if not token:
182
+ print(" ⚠️ All tokens are rate-limited, waiting...")
183
+
184
  return token
185
 
186
  def get_headers(self):
187
+ """Get headers with the next available token."""
188
  token = self.get_next_token()
189
  return {'Authorization': f'token {token}'} if token else {}
190
 
191
+ def mark_rate_limited(self, token, reset_timestamp=None):
192
+ """
193
+ Mark a token as rate-limited with optional reset timestamp.
194
+
195
+ Args:
196
+ token: The token to mark as rate-limited
197
+ reset_timestamp: Unix timestamp when rate limit resets (optional)
198
+ """
199
+ if not token:
200
+ return
201
+
202
+ with self.lock:
203
+ # Determine which pool the token belongs to
204
+ if token in self.parallel_tokens:
205
+ self.parallel_rate_limited.add(token)
206
+ if token in self.roundrobin_tokens:
207
+ self.roundrobin_rate_limited.add(token)
208
+
209
+ # Store reset timestamp if provided
210
+ if reset_timestamp:
211
+ self.rate_limit_resets[token] = reset_timestamp
212
+ from datetime import datetime, timezone
213
+ reset_time = datetime.fromtimestamp(reset_timestamp, tz=timezone.utc)
214
+ print(f" ⏰ Token rate-limited until {reset_time.strftime('%H:%M:%S')} UTC")
215
+
216
+ def get_available_parallel_tokens(self):
217
+ """Get list of all available (non-rate-limited) parallel tokens."""
218
+ with self.lock:
219
+ self._cleanup_expired_rate_limits()
220
+ return [t for t in self.parallel_tokens if t not in self.parallel_rate_limited]
221
+
222
+ def get_stats(self):
223
+ """Get token pool usage statistics."""
224
+ with self.lock:
225
+ return {
226
+ 'parallel_calls': self.stats['parallel_calls'],
227
+ 'roundrobin_calls': self.stats['roundrobin_calls'],
228
+ 'fallback_triggers': self.stats['fallback_triggers'],
229
+ 'parallel_rate_limited': len(self.parallel_rate_limited),
230
+ 'roundrobin_rate_limited': len(self.roundrobin_rate_limited)
231
+ }
232
+
233
+ def print_stats(self):
234
+ """Print token pool usage statistics."""
235
+ stats = self.get_stats()
236
+ total_calls = stats['parallel_calls'] + stats['roundrobin_calls']
237
+
238
+ print(f"\nπŸ“Š Token Pool Statistics:")
239
+ print(f" Total API calls: {total_calls}")
240
+ if total_calls > 0:
241
+ print(f" Parallel calls: {stats['parallel_calls']} ({stats['parallel_calls']/total_calls*100:.1f}%)")
242
+ print(f" Round-robin calls: {stats['roundrobin_calls']} ({stats['roundrobin_calls']/total_calls*100:.1f}%)")
243
+ print(f" Fallback triggers: {stats['fallback_triggers']}")
244
+ print(f" Currently rate-limited: {stats['parallel_rate_limited']} parallel, {stats['roundrobin_rate_limited']} round-robin")
245
+
246
 
247
  def get_hf_token():
248
  """Get HuggingFace token from environment variables."""
 
256
  # GITHUB API FUNCTIONS
257
  # =============================================================================
258
 
259
+ def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30, token_pool=None, token=None):
260
  """
261
  Perform an HTTP request with exponential backoff and jitter for GitHub API.
262
  Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
263
+
264
+ Args:
265
+ token_pool: Optional TokenPool instance for rate limit tracking
266
+ token: Optional token string to mark as rate-limited if 403/429 occurs
267
+
268
+ Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
269
  """
270
  delay = 1.0
271
  for attempt in range(max_retries):
 
289
  # Rate limits or server errors -> retry with backoff
290
  if status in (403, 429) or 500 <= status < 600:
291
  wait = None
292
+ reset_timestamp = None
293
 
294
  # Prefer Retry-After when present
295
  retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
 
304
  reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
305
  if reset_hdr:
306
  try:
307
+ reset_timestamp = int(float(reset_hdr))
308
+ wait = max(reset_timestamp - time.time() + 2, 1)
309
  except Exception:
310
  wait = None
311
 
312
+ # Mark token as rate-limited if we have token pool and token
313
+ if status in (403, 429) and token_pool and token:
314
+ token_pool.mark_rate_limited(token, reset_timestamp)
315
+
316
  # Final fallback: exponential backoff with jitter
317
  if wait is None:
318
  wait = delay + random.uniform(0, 0.5)
 
388
  'sort': 'created',
389
  'order': 'asc'
390
  }
391
+ token = token_pool.get_next_token()
392
+ headers = {'Authorization': f'token {token}'} if token else {}
393
 
394
  try:
395
+ response = request_with_backoff('GET', url, headers=headers, params=params, token_pool=token_pool, token=token)
396
  if response is None:
397
  print(f"{indent} Error: retries exhausted for range {start_str} to {end_str}")
398
  return total_in_partition
 
539
  return total_in_partition
540
 
541
 
542
+ def fetch_reviews_parallel(query_patterns, start_date, end_date, token_pool, prs_by_url):
543
+ """
544
+ Fetch reviews for multiple query patterns in parallel using available parallel tokens.
545
+
546
+ This function uses ThreadPoolExecutor to execute multiple query patterns concurrently,
547
+ with each pattern using a dedicated token from the parallel pool. Falls back to
548
+ sequential execution if insufficient parallel tokens are available.
549
+
550
+ Args:
551
+ query_patterns: List of query pattern strings (e.g., ['is:pr author:bot1', 'is:pr reviewed-by:bot1'])
552
+ start_date: Start datetime for time range
553
+ end_date: End datetime for time range
554
+ token_pool: TokenPool instance for token management
555
+ prs_by_url: Dictionary to collect PRs by URL (shared across patterns)
556
+
557
+ Returns:
558
+ Total number of PRs found across all patterns
559
+ """
560
+ from concurrent.futures import ThreadPoolExecutor, as_completed
561
+ import threading
562
+
563
+ # Check how many parallel tokens are available
564
+ available_tokens = token_pool.get_available_parallel_tokens()
565
+
566
+ if len(available_tokens) < 2 or len(query_patterns) < 2:
567
+ # Not enough tokens or patterns for parallelization, use sequential
568
+ print(f" ⚠️ Sequential execution: {len(available_tokens)} parallel tokens available for {len(query_patterns)} patterns")
569
+ total_found = 0
570
+ for pattern in query_patterns:
571
+ pattern_prs = {}
572
+ count = fetch_reviews_with_time_partition(
573
+ pattern, start_date, end_date, token_pool, pattern_prs, depth=0
574
+ )
575
+ # Merge pattern results into global dict
576
+ lock = threading.Lock()
577
+ with lock:
578
+ for url, pr in pattern_prs.items():
579
+ if url not in prs_by_url:
580
+ prs_by_url[url] = pr
581
+ total_found += count
582
+ return total_found
583
+
584
+ # Use parallel execution
585
+ print(f" πŸš€ Parallel execution: {len(available_tokens)} parallel tokens for {len(query_patterns)} patterns")
586
+
587
+ # Thread-safe lock for updating prs_by_url
588
+ lock = threading.Lock()
589
+
590
+ def fetch_pattern(pattern):
591
+ """Fetch reviews for a single pattern (runs in parallel)."""
592
+ pattern_prs = {}
593
+ try:
594
+ count = fetch_reviews_with_time_partition(
595
+ pattern, start_date, end_date, token_pool, pattern_prs, depth=0
596
+ )
597
+ return pattern, pattern_prs, count
598
+ except Exception as e:
599
+ print(f" Error fetching pattern '{pattern}': {str(e)}")
600
+ return pattern, {}, 0
601
+
602
+ # Execute patterns in parallel
603
+ max_workers = min(len(query_patterns), len(available_tokens))
604
+ total_found = 0
605
+
606
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
607
+ # Submit all patterns
608
+ future_to_pattern = {
609
+ executor.submit(fetch_pattern, pattern): pattern
610
+ for pattern in query_patterns
611
+ }
612
+
613
+ # Collect results as they complete
614
+ for future in as_completed(future_to_pattern):
615
+ pattern = future_to_pattern[future]
616
+ try:
617
+ _, pattern_prs, count = future.result()
618
+
619
+ # Merge results into global dict (thread-safe)
620
+ with lock:
621
+ for url, pr in pattern_prs.items():
622
+ if url not in prs_by_url:
623
+ prs_by_url[url] = pr
624
+
625
+ total_found += count
626
+ print(f" βœ“ Pattern '{pattern}' completed: {count} PRs found")
627
+
628
+ except Exception as e:
629
+ print(f" βœ— Pattern '{pattern}' failed: {str(e)}")
630
+
631
+ return total_found
632
+
633
+
634
  def extract_review_metadata(pr):
635
  """
636
  Extract minimal PR review metadata for efficient storage.
 
712
  owner, repo, pull_word, pr_number = parts[0], parts[1], parts[2], parts[3]
713
  api_url = f'https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}'
714
 
715
+ token = token_pool.get_next_token()
716
+ headers = {'Authorization': f'token {token}'} if token else {}
717
+ response = request_with_backoff('GET', api_url, headers=headers, max_retries=3, token_pool=token_pool, token=token)
718
 
719
  if response and response.status_code == 200:
720
  pr_data = response.json()
 
788
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 12:00 AM UTC today
789
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
790
 
791
+ print(f"\nπŸ” Searching for PRs reviewed by {identifier}")
792
+ print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} (today excluded)")
793
+ print(f" Query patterns: {len(query_patterns)}")
794
 
795
+ overall_start_time = time.time()
 
796
 
797
+ # Use parallel execution if multiple patterns and sufficient tokens
798
+ if len(query_patterns) > 1:
799
+ reviews_found = fetch_reviews_parallel(
800
+ query_patterns,
801
+ start_date,
802
+ end_date,
803
+ token_pool,
804
+ prs_by_url
805
+ )
806
+ else:
807
+ # Single pattern, use sequential
808
  reviews_found = fetch_reviews_with_time_partition(
809
+ query_patterns[0],
810
  start_date,
811
  end_date,
812
  token_pool,
813
  prs_by_url
814
  )
815
 
816
+ overall_duration = time.time() - overall_start_time
817
+ print(f" βœ“ All patterns complete: {len(prs_by_url)} unique PRs found")
818
+ print(f" ⏱️ Total time: {overall_duration:.1f} seconds")
 
 
 
 
819
 
820
  all_prs = list(prs_by_url.values())
821