Spaces:
Running
Running
File size: 5,198 Bytes
1b44660 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import type { WorkflowStep } from 'cloudflare:workers';
import { Logger } from './logger';
/**
* Configuration options for the rate limiter
*/
type RateLimiterOptions = {
maxConcurrent: number;
globalCooldownMs: number;
domainCooldownMs: number;
};
/**
* Represents a batch item with an ID and URL
*/
type BatchItem<IdType = number | string> = {
id: IdType;
url: string;
};
/**
* Rate limiter that respects per-domain cooldowns to prevent overloading specific domains
* when making HTTP requests. Handles batching and throttling of requests.
*
* @template T Type of the batch items, must extend BatchItem
* @template I Type of the ID field, defaults to number | string
*/
export class DomainRateLimiter<T extends BatchItem<I>, I = number | string> {
private lastDomainAccess = new Map<string, number>();
private options: RateLimiterOptions;
private logger: Logger;
/**
* Creates a new DomainRateLimiter instance
*
* @param options Configuration options for throttling
*/
constructor(options: RateLimiterOptions) {
this.options = options;
this.logger = new Logger({ service: 'DomainRateLimiter' });
}
/**
* Processes a batch of items with domain-aware rate limiting
*
* @param items Array of items to process
* @param step Workflow step instance for handling sleeps/delays
* @param processItem Function that processes a single item and returns a result
* @returns Promise resolving to an array of results in the same order as input items
*
* @template R The return type of the processItem function
*/
async processBatch<R>(
items: T[],
step: WorkflowStep,
processItem: (item: T, domain: string) => Promise<R>
): Promise<R[]> {
const batchLogger = this.logger.child({ batch_size: items.length });
batchLogger.info('Starting batch processing');
const results: R[] = [];
const remainingItems = [...items];
while (remainingItems.length > 0) {
const currentBatch: T[] = [];
const currentTime = Date.now();
// Select items for current batch based on domain cooldown
for (const item of [...remainingItems]) {
if (currentBatch.length >= this.options.maxConcurrent) break;
try {
const domain = new URL(item.url).hostname;
const lastAccess = this.lastDomainAccess.get(domain) || 0;
if (currentTime - lastAccess >= this.options.domainCooldownMs) {
currentBatch.push(item);
// Remove from remaining items
const idx = remainingItems.findIndex(i => i.id === item.id);
if (idx >= 0) remainingItems.splice(idx, 1);
}
} catch (e) {
// Skip invalid URLs
const idx = remainingItems.findIndex(i => i.id === item.id);
if (idx >= 0) remainingItems.splice(idx, 1);
}
}
if (currentBatch.length === 0) {
// Nothing ready yet, wait for next domain to be ready
const nextReady = Math.min(
...remainingItems
.map(item => {
try {
const domain = new URL(item.url).hostname;
const lastAccess = this.lastDomainAccess.get(domain) || 0;
return this.options.domainCooldownMs - (currentTime - lastAccess);
} catch {
return Number.POSITIVE_INFINITY; // Skip invalid URLs
}
})
.filter(time => time > 0) // Only consider positive wait times
);
batchLogger.debug('Waiting for domain cooldown', { wait_time_ms: Math.max(500, nextReady) });
await step.sleep(`waiting for domain cooldown (${Math.round(nextReady / 1000)}s)`, Math.max(500, nextReady));
continue;
}
batchLogger.debug('Processing batch', { batch_size: currentBatch.length, remaining: remainingItems.length });
// Process current batch in parallel
const batchResults = await Promise.allSettled(
currentBatch.map(async item => {
try {
const domain = new URL(item.url).hostname;
this.lastDomainAccess.set(domain, Date.now());
return await processItem(item, domain);
} catch (error) {
const itemLogger = batchLogger.child({ item_id: item.id });
itemLogger.error(
'Error processing item',
undefined,
error instanceof Error ? error : new Error(String(error))
);
throw error;
}
})
);
// Add results
for (const result of batchResults) {
if (result.status === 'fulfilled') {
results.push(result.value);
}
}
// Apply global cooldown between batches if we have more items to process
if (remainingItems.length > 0) {
batchLogger.debug('Applying global rate limit', { cooldown_ms: this.options.globalCooldownMs });
await step.sleep(
`global rate limit (${Math.round(this.options.globalCooldownMs / 1000)}s)`,
this.options.globalCooldownMs
);
}
}
batchLogger.info('Batch processing complete', { processed_count: results.length });
return results;
}
}
|