web_reader / build /cloud-functions /data-crunching.js
Mohammad Shahid
Include pre-built files for HF deployment
f316cce
"use strict";
var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) {
var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d;
if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc);
else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r;
return c > 3 && r && Object.defineProperty(target, key, r), r;
};
var __metadata = (this && this.__metadata) || function (k, v) {
if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v);
};
var __param = (this && this.__param) || function (paramIndex, decorator) {
return function (target, key) { decorator(target, key, paramIndex); }
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
var _a, _b, _c;
Object.defineProperty(exports, "__esModule", { value: true });
exports.DataCrunchingHost = void 0;
const civkit_1 = require("civkit");
const tsyringe_1 = require("tsyringe");
const shared_1 = require("../shared");
const lodash_1 = __importDefault(require("lodash"));
const crawler_1 = require("../api/crawler");
const crawled_1 = require("../db/crawled");
const dayjs_1 = __importDefault(require("dayjs"));
const fs_1 = require("fs");
const promises_1 = require("fs/promises");
const zlib_1 = require("zlib");
const functions_1 = require("firebase-admin/functions");
const snapshot_formatter_1 = require("../services/snapshot-formatter");
const get_function_url_1 = require("../utils/get-function-url");
dayjs_1.default.extend(require('dayjs/plugin/utc'));
let DataCrunchingHost = class DataCrunchingHost extends civkit_1.RPCHost {
constructor(globalLogger, crawler, snapshotFormatter, tempFileManager, firebaseObjectStorage) {
super(...lodash_1.default.without(arguments, crawler));
this.globalLogger = globalLogger;
this.crawler = crawler;
this.snapshotFormatter = snapshotFormatter;
this.tempFileManager = tempFileManager;
this.firebaseObjectStorage = firebaseObjectStorage;
this.logger = this.globalLogger.child({ service: this.constructor.name });
this.pageCacheCrunchingPrefix = 'crunched-pages';
this.pageCacheCrunchingBatchSize = 5000;
this.pageCacheCrunchingTMinus = 6 * 24 * 60 * 60 * 1000;
this.rev = 7;
}
async init() {
await this.dependencyReady();
this.emit('ready');
}
// @CloudTaskV2({
// runtime: {
// cpu: 2,
// memory: '4GiB',
// timeoutSeconds: 3600,
// concurrency: 2,
// maxInstances: 200,
// retryConfig: {
// maxAttempts: 3,
// minBackoffSeconds: 60,
// },
// rateLimits: {
// maxConcurrentDispatches: 150,
// maxDispatchesPerSecond: 2,
// },
// },
// tags: ['DataCrunching'],
// })
async crunchPageCacheWorker(date, offset) {
this.logger.info(`Crunching page cache @${date}+${offset}...`);
for await (const { fileName, records } of this.iterPageCacheRecords(date, offset)) {
this.logger.info(`Crunching ${fileName}...`);
const fileOnDrive = await this.crunchCacheRecords(records);
const fstream = (0, fs_1.createReadStream)(fileOnDrive.path);
const gzipStream = (0, zlib_1.createGzip)();
fstream.pipe(gzipStream, { end: true });
await this.firebaseObjectStorage.bucket.file(fileName).save(gzipStream, {
contentType: 'application/jsonl+gzip',
});
}
this.logger.info(`Crunching page cache @${date}+${offset} done.`);
return true;
}
// @CloudScheduleV2('2 0 * * *', {
// name: 'crunchPageCacheEveryday',
// runtime: {
// cpu: 2,
// memory: '4GiB',
// timeoutSeconds: 1800,
// timeZone: 'UTC',
// retryCount: 3,
// minBackoffSeconds: 60,
// },
// tags: ['DataCrunching'],
// })
async dispatchPageCacheCrunching() {
for await (const { fileName, date, offset } of this.iterPageCacheChunks()) {
this.logger.info(`Dispatching ${fileName}...`);
// sse.write({ data: `Dispatching ${fileName}...` });
await (0, functions_1.getFunctions)().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, {
dispatchDeadlineSeconds: 1800,
uri: await (0, get_function_url_1.getFunctionUrl)('crunchPageCacheWorker'),
});
}
return true;
}
// @CloudHTTPv2({
// runtime: {
// cpu: 2,
// memory: '4GiB',
// timeoutSeconds: 3600,
// concurrency: 2,
// maxInstances: 200,
// },
// tags: ['DataCrunching'],
// })
// async dispatchPageCacheCrunching(
// @RPCReflect() rpcReflect: RPCReflection
// ) {
// const sse = new OutputServerEventStream({ highWaterMark: 4096 });
// rpcReflect.return(sse);
// rpcReflect.catch((err) => {
// sse.end({ data: `Error: ${err.message}` });
// });
// for await (const { fileName, date, offset } of this.iterPageCacheChunks()) {
// this.logger.info(`Dispatching ${fileName}...`);
// sse.write({ data: `Dispatching ${fileName}...` });
// await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, {
// dispatchDeadlineSeconds: 1800,
// uri: await getFunctionUrl('crunchPageCacheWorker'),
// });
// }
// sse.end({ data: 'done' });
// return true;
// }
async *iterPageCacheRecords(date, inputOffset) {
const startOfToday = (0, dayjs_1.default)().utc().startOf('day');
const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day');
let theDay = startingPoint;
if (date) {
theDay = (0, dayjs_1.default)(date).utc().startOf('day');
}
let counter = 0;
if (inputOffset) {
counter = parseInt(inputOffset, 10);
}
while (theDay.isBefore(startOfToday)) {
const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`;
const offset = counter;
counter += this.pageCacheCrunchingBatchSize;
const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0];
if (fileExists) {
continue;
}
const records = await crawled_1.Crawled.fromFirestoreQuery(crawled_1.Crawled.COLLECTION
.where('createdAt', '>=', theDay.toDate())
.where('createdAt', '<', theDay.add(1, 'day').toDate())
.orderBy('createdAt', 'asc')
.offset(offset)
.limit(this.pageCacheCrunchingBatchSize));
this.logger.info(`Found ${records.length} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter });
if (!records.length) {
if (date) {
break;
}
theDay = theDay.add(1, 'day');
counter = 0;
continue;
}
yield { fileName, records };
if (offset) {
break;
}
}
}
async *iterPageCacheChunks() {
const startOfToday = (0, dayjs_1.default)().utc().startOf('day');
const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day');
let theDay = startingPoint;
let counter = 0;
while (theDay.isBefore(startOfToday)) {
const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`;
const offset = counter;
counter += this.pageCacheCrunchingBatchSize;
const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0];
if (fileExists) {
continue;
}
const nRecords = (await crawled_1.Crawled.COLLECTION
.where('createdAt', '>=', theDay.toDate())
.where('createdAt', '<', theDay.add(1, 'day').toDate())
.orderBy('createdAt', 'asc')
.offset(offset)
.limit(this.pageCacheCrunchingBatchSize)
.count().get()).data().count;
this.logger.info(`Found ${nRecords} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter });
if (nRecords < this.pageCacheCrunchingBatchSize) {
theDay = theDay.add(1, 'day');
counter = 0;
}
if (nRecords) {
yield { fileName, date: theDay.toISOString(), offset };
}
}
}
async crunchCacheRecords(records) {
const throttle = new civkit_1.PromiseThrottle(30);
const localFilePath = this.tempFileManager.alloc();
let nextDrainDeferred = (0, civkit_1.Defer)();
nextDrainDeferred.resolve();
for (const record of records) {
await throttle.acquire();
this.firebaseObjectStorage.downloadFile(`snapshots/${record._id}`)
.then(async (snapshotTxt) => {
try {
const snapshot = JSON.parse(snapshotTxt.toString('utf-8'));
let formatted = await this.snapshotFormatter.formatSnapshot('default', snapshot);
if (!formatted.content) {
formatted = await this.snapshotFormatter.formatSnapshot('markdown', snapshot);
}
await nextDrainDeferred.promise;
await (0, promises_1.appendFile)(localFilePath, JSON.stringify({
url: snapshot.href,
title: snapshot.title || '',
html: snapshot.html || '',
text: snapshot.text || '',
content: formatted.content || '',
}) + '\n', { encoding: 'utf-8' });
}
catch (err) {
this.logger.warn(`Failed to parse snapshot for ${record._id}`, { err });
}
})
.finally(() => {
throttle.release();
});
}
await throttle.nextDrain();
const ro = {
path: localFilePath
};
this.tempFileManager.bindPathTo(ro, localFilePath);
return ro;
}
};
exports.DataCrunchingHost = DataCrunchingHost;
__decorate([
__param(0, (0, shared_1.Param)('date')),
__param(1, (0, shared_1.Param)('offset', { default: 0 })),
__metadata("design:type", Function),
__metadata("design:paramtypes", [String, Number]),
__metadata("design:returntype", Promise)
], DataCrunchingHost.prototype, "crunchPageCacheWorker", null);
exports.DataCrunchingHost = DataCrunchingHost = __decorate([
(0, tsyringe_1.singleton)(),
__metadata("design:paramtypes", [typeof (_a = typeof shared_1.Logger !== "undefined" && shared_1.Logger) === "function" ? _a : Object, crawler_1.CrawlerHost,
snapshot_formatter_1.SnapshotFormatter, typeof (_b = typeof shared_1.TempFileManager !== "undefined" && shared_1.TempFileManager) === "function" ? _b : Object, typeof (_c = typeof shared_1.FirebaseStorageBucketControl !== "undefined" && shared_1.FirebaseStorageBucketControl) === "function" ? _c : Object])
], DataCrunchingHost);
//# sourceMappingURL=data-crunching.js.map