nomagick commited on
Commit
3b3a026
·
unverified ·
1 Parent(s): c064fcf

feat: control concurrent request per page instead of server bucket

Browse files
Files changed (1) hide show
  1. src/services/puppeteer.ts +73 -43
src/services/puppeteer.ts CHANGED
@@ -1,21 +1,26 @@
 
 
 
1
  import fs from 'fs';
2
  import { container, singleton } from 'tsyringe';
3
- import { AsyncService, Defer, marshalErrorLike, AssertionFailureError, delay, Deferred, perNextTick, ParamValidationError, FancyFile } from 'civkit';
4
- import { GlobalLogger } from './logger';
5
 
6
- import type { Browser, CookieParam, GoToOptions, HTTPResponse, Page, Viewport } from 'puppeteer';
7
  import type { Cookie } from 'set-cookie-parser';
8
  import puppeteer from 'puppeteer-extra';
 
 
 
 
 
 
 
9
 
10
  import puppeteerBlockResources from 'puppeteer-extra-plugin-block-resources';
11
  import { SecurityCompromiseError, ServiceCrashedError, ServiceNodeResourceDrainError } from '../shared/lib/errors';
12
- import { TimeoutError } from 'puppeteer';
13
- import _ from 'lodash';
14
- import { isIP } from 'net';
15
  import { CurlControl } from './curl';
16
- import { readFile } from 'fs/promises';
17
  import { BlackHoleDetector } from './blackhole-detector';
18
  import { AsyncLocalContext } from './async-context';
 
19
  const tldExtract = require('tld-extract');
20
 
21
  const READABILITY_JS = fs.readFileSync(require.resolve('@mozilla/readability/Readability.js'), 'utf-8');
@@ -440,6 +445,39 @@ window.briefImgs = briefImgs;
440
  })();
441
  `;
442
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
443
  @singleton()
444
  export class PuppeteerControl extends AsyncService {
445
 
@@ -447,8 +485,6 @@ export class PuppeteerControl extends AsyncService {
447
  browser!: Browser;
448
  logger = this.globalLogger.child({ service: this.constructor.name });
449
 
450
- private __reqCapInterval?: NodeJS.Timeout;
451
-
452
  __loadedPage: Page[] = [];
453
 
454
  finalizerMap = new WeakMap<Page, ReturnType<typeof setTimeout>>();
@@ -458,9 +494,10 @@ export class PuppeteerControl extends AsyncService {
458
  lastPageCratedAt: number = 0;
459
  ua: string = '';
460
 
461
- rpsCap: number = 500;
 
 
462
  lastReqSentAt: number = 0;
463
- requestDeferredQueue: Deferred<boolean>[] = [];
464
 
465
  circuitBreakerHosts: Set<string> = new Set();
466
 
@@ -490,10 +527,6 @@ export class PuppeteerControl extends AsyncService {
490
  }
491
 
492
  override async init() {
493
- if (this.__reqCapInterval) {
494
- clearInterval(this.__reqCapInterval);
495
- this.__reqCapInterval = undefined;
496
- }
497
  await this.dependencyReady();
498
  if (process.env.NODE_ENV?.includes('dry-run')) {
499
  this.emit('ready');
@@ -536,22 +569,14 @@ export class PuppeteerControl extends AsyncService {
536
  this.emit('ready');
537
  }
538
 
539
- @perNextTick()
540
- reqCapRoutine() {
541
- const now = Date.now();
542
- const numToPass = Math.round((now - this.lastReqSentAt) / 1000 * this.rpsCap);
543
- this.requestDeferredQueue.splice(0, numToPass).forEach((x) => x.resolve(true));
544
- if (numToPass) {
545
- this.lastReqSentAt = now;
546
- }
547
- if (!this.requestDeferredQueue.length) {
548
- if (this.__reqCapInterval) {
549
- clearInterval(this.__reqCapInterval);
550
- this.__reqCapInterval = undefined;
551
- }
552
- } else if (!this.__reqCapInterval) {
553
- this.__reqCapInterval = setInterval(() => this.reqCapRoutine(), 1000 / this.rpsCap).unref();
554
  }
 
 
555
  }
556
 
557
  async newPage(bewareDeadLock: any = false) {
@@ -564,7 +589,7 @@ export class PuppeteerControl extends AsyncService {
564
  const dedicatedContext = await this.browser.createBrowserContext();
565
  page = await dedicatedContext.newPage();
566
  } catch (err: any) {
567
- this.logger.warn(`Failed to create page ${sn}`, { err: marshalErrorLike(err) });
568
  this.browser.process()?.kill('SIGKILL');
569
  throw new ServiceNodeResourceDrainError(`This specific worker node failed to open a new page, try again.`);
570
  }
@@ -661,10 +686,8 @@ export class PuppeteerControl extends AsyncService {
661
  }
662
 
663
  if (requestUrl.startsWith('http')) {
664
- const d = Defer();
665
- this.requestDeferredQueue.push(d);
666
- this.reqCapRoutine();
667
- await d.promise;
668
  }
669
 
670
  if (req.isInterceptResolutionHandled()) {
@@ -677,6 +700,13 @@ export class PuppeteerControl extends AsyncService {
677
 
678
  return req.continue(continueArgs[0], continueArgs[1]);
679
  });
 
 
 
 
 
 
 
680
 
681
  await page.evaluateOnNewDocument(`
682
  (function () {
@@ -721,7 +751,7 @@ export class PuppeteerControl extends AsyncService {
721
  this.newPage()
722
  .then((r) => this.__loadedPage.push(r))
723
  .catch((err) => {
724
- this.logger.warn(`Failed to load new page ahead of time`, { err: marshalErrorLike(err) });
725
  });
726
  }
727
  }
@@ -761,7 +791,7 @@ export class PuppeteerControl extends AsyncService {
761
  })(),
762
  delay(5000)
763
  ]).catch((err) => {
764
- this.logger.error(`Failed to destroy page ${sn}`, { err: marshalErrorLike(err) });
765
  });
766
  this.livePages.delete(page);
767
  this.pagePhase.delete(page);
@@ -997,7 +1027,7 @@ export class PuppeteerControl extends AsyncService {
997
  try {
998
  await page.setCookie(...mapped);
999
  } catch (err: any) {
1000
- this.logger.warn(`Page ${sn}: Failed to set cookies`, { err: marshalErrorLike(err) });
1001
  throw new ParamValidationError({
1002
  path: 'cookies',
1003
  message: `Failed to set cookies: ${err?.message}`
@@ -1062,7 +1092,7 @@ export class PuppeteerControl extends AsyncService {
1062
  const gotoPromise = page.goto(url, goToOptions)
1063
  .catch((err) => {
1064
  if (err instanceof TimeoutError) {
1065
- this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err: marshalErrorLike(err) });
1066
  return new AssertionFailureError({
1067
  message: `Failed to goto ${url}: ${err}`,
1068
  cause: err,
@@ -1075,7 +1105,7 @@ export class PuppeteerControl extends AsyncService {
1075
  }
1076
  }
1077
 
1078
- this.logger.warn(`Page ${sn}: Browsing of ${url} failed`, { err: marshalErrorLike(err) });
1079
  return new AssertionFailureError({
1080
  message: `Failed to goto ${url}: ${err}`,
1081
  cause: err,
@@ -1126,7 +1156,7 @@ export class PuppeteerControl extends AsyncService {
1126
  // }
1127
  // }
1128
  // } catch (err: any) {
1129
- // this.logger.warn(`Page ${sn}: Failed to salvage ${url}`, { err: marshalErrorLike(err) });
1130
  // }
1131
 
1132
  finalized = true;
@@ -1166,7 +1196,7 @@ export class PuppeteerControl extends AsyncService {
1166
  finalized = true;
1167
  })
1168
  .catch((err) => {
1169
- this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) });
1170
  waitForPromise = undefined;
1171
  });
1172
  return p as any;
@@ -1243,7 +1273,7 @@ export class PuppeteerControl extends AsyncService {
1243
  // }
1244
 
1245
  // await page.goto(googleArchiveUrl, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 15_000 }).catch((err) => {
1246
- // this.logger.warn(`Page salvation did not fully succeed.`, { err: marshalErrorLike(err) });
1247
  // });
1248
 
1249
  // this.logger.info(`Salvation completed.`);
 
1
+ import _ from 'lodash';
2
+ import { isIP } from 'net';
3
+ import { readFile } from 'fs/promises';
4
  import fs from 'fs';
5
  import { container, singleton } from 'tsyringe';
 
 
6
 
7
+ import type { Browser, CookieParam, GoToOptions, HTTPRequest, HTTPResponse, Page, Viewport } from 'puppeteer';
8
  import type { Cookie } from 'set-cookie-parser';
9
  import puppeteer from 'puppeteer-extra';
10
+ import { TimeoutError } from 'puppeteer';
11
+
12
+ import { Defer, Deferred } from 'civkit/defer';
13
+ import { AssertionFailureError, ParamValidationError } from 'civkit/civ-rpc';
14
+ import { AsyncService } from 'civkit/async-service';
15
+ import { FancyFile } from 'civkit/fancy-file';
16
+ import { delay } from 'civkit/timeout';
17
 
18
  import puppeteerBlockResources from 'puppeteer-extra-plugin-block-resources';
19
  import { SecurityCompromiseError, ServiceCrashedError, ServiceNodeResourceDrainError } from '../shared/lib/errors';
 
 
 
20
  import { CurlControl } from './curl';
 
21
  import { BlackHoleDetector } from './blackhole-detector';
22
  import { AsyncLocalContext } from './async-context';
23
+ import { GlobalLogger } from './logger';
24
  const tldExtract = require('tld-extract');
25
 
26
  const READABILITY_JS = fs.readFileSync(require.resolve('@mozilla/readability/Readability.js'), 'utf-8');
 
445
  })();
446
  `;
447
 
448
+ class PageReqCtrlKit {
449
+ reqSet: Set<HTTPRequest> = new Set();
450
+ blockers: Deferred<void>[] = [];
451
+
452
+ constructor(
453
+ public concurrency: number,
454
+ ) {
455
+ if (isNaN(concurrency) || concurrency < 1) {
456
+ throw new AssertionFailureError(`Invalid concurrency: ${concurrency}`);
457
+ }
458
+ }
459
+
460
+ onNewRequest(req: HTTPRequest) {
461
+ this.reqSet.add(req);
462
+ if (this.reqSet.size <= this.concurrency) {
463
+ return Promise.resolve();
464
+ }
465
+ const deferred = Defer();
466
+ this.blockers.push(deferred);
467
+
468
+ return deferred.promise;
469
+ }
470
+
471
+ onFinishRequest(req: HTTPRequest) {
472
+ this.reqSet.delete(req);
473
+ if (this.reqSet.size > this.concurrency) {
474
+ return;
475
+ }
476
+ const deferred = this.blockers.shift();
477
+ deferred?.resolve();
478
+ }
479
+ }
480
+
481
  @singleton()
482
  export class PuppeteerControl extends AsyncService {
483
 
 
485
  browser!: Browser;
486
  logger = this.globalLogger.child({ service: this.constructor.name });
487
 
 
 
488
  __loadedPage: Page[] = [];
489
 
490
  finalizerMap = new WeakMap<Page, ReturnType<typeof setTimeout>>();
 
494
  lastPageCratedAt: number = 0;
495
  ua: string = '';
496
 
497
+ concurrentRequestsPerPage: number = 16;
498
+ pageReqCtrl = new WeakMap<Page, PageReqCtrlKit>();
499
+
500
  lastReqSentAt: number = 0;
 
501
 
502
  circuitBreakerHosts: Set<string> = new Set();
503
 
 
527
  }
528
 
529
  override async init() {
 
 
 
 
530
  await this.dependencyReady();
531
  if (process.env.NODE_ENV?.includes('dry-run')) {
532
  this.emit('ready');
 
569
  this.emit('ready');
570
  }
571
 
572
+ protected getRpsControlKit(page: Page) {
573
+ let kit = this.pageReqCtrl.get(page);
574
+ if (!kit) {
575
+ kit = new PageReqCtrlKit(this.concurrentRequestsPerPage);
576
+ this.pageReqCtrl.set(page, kit);
 
 
 
 
 
 
 
 
 
 
577
  }
578
+
579
+ return kit;
580
  }
581
 
582
  async newPage(bewareDeadLock: any = false) {
 
589
  const dedicatedContext = await this.browser.createBrowserContext();
590
  page = await dedicatedContext.newPage();
591
  } catch (err: any) {
592
+ this.logger.warn(`Failed to create page ${sn}`, { err });
593
  this.browser.process()?.kill('SIGKILL');
594
  throw new ServiceNodeResourceDrainError(`This specific worker node failed to open a new page, try again.`);
595
  }
 
686
  }
687
 
688
  if (requestUrl.startsWith('http')) {
689
+ const kit = this.getRpsControlKit(page);
690
+ await kit.onNewRequest(req);
 
 
691
  }
692
 
693
  if (req.isInterceptResolutionHandled()) {
 
700
 
701
  return req.continue(continueArgs[0], continueArgs[1]);
702
  });
703
+ const reqFinishHandler = (req: HTTPRequest) => {
704
+ const kit = this.getRpsControlKit(page);
705
+ kit.onFinishRequest(req);
706
+ };
707
+ page.on('requestfinished', reqFinishHandler);
708
+ page.on('requestfailed', reqFinishHandler);
709
+ page.on('requestservedfromcache', reqFinishHandler);
710
 
711
  await page.evaluateOnNewDocument(`
712
  (function () {
 
751
  this.newPage()
752
  .then((r) => this.__loadedPage.push(r))
753
  .catch((err) => {
754
+ this.logger.warn(`Failed to load new page ahead of time`, { err });
755
  });
756
  }
757
  }
 
791
  })(),
792
  delay(5000)
793
  ]).catch((err) => {
794
+ this.logger.error(`Failed to destroy page ${sn}`, { err });
795
  });
796
  this.livePages.delete(page);
797
  this.pagePhase.delete(page);
 
1027
  try {
1028
  await page.setCookie(...mapped);
1029
  } catch (err: any) {
1030
+ this.logger.warn(`Page ${sn}: Failed to set cookies`, { err });
1031
  throw new ParamValidationError({
1032
  path: 'cookies',
1033
  message: `Failed to set cookies: ${err?.message}`
 
1092
  const gotoPromise = page.goto(url, goToOptions)
1093
  .catch((err) => {
1094
  if (err instanceof TimeoutError) {
1095
+ this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err });
1096
  return new AssertionFailureError({
1097
  message: `Failed to goto ${url}: ${err}`,
1098
  cause: err,
 
1105
  }
1106
  }
1107
 
1108
+ this.logger.warn(`Page ${sn}: Browsing of ${url} failed`, { err });
1109
  return new AssertionFailureError({
1110
  message: `Failed to goto ${url}: ${err}`,
1111
  cause: err,
 
1156
  // }
1157
  // }
1158
  // } catch (err: any) {
1159
+ // this.logger.warn(`Page ${sn}: Failed to salvage ${url}`, { err });
1160
  // }
1161
 
1162
  finalized = true;
 
1196
  finalized = true;
1197
  })
1198
  .catch((err) => {
1199
+ this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err });
1200
  waitForPromise = undefined;
1201
  });
1202
  return p as any;
 
1273
  // }
1274
 
1275
  // await page.goto(googleArchiveUrl, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 15_000 }).catch((err) => {
1276
+ // this.logger.warn(`Page salvation did not fully succeed.`, { err });
1277
  // });
1278
 
1279
  // this.logger.info(`Salvation completed.`);