coyotte508's picture
coyotte508 HF staff
πŸ› Fix export of convos (#267)
aa125df unverified
raw
history blame
4.28 kB
import {
PARQUET_EXPORT_DATASET,
PARQUET_EXPORT_HF_TOKEN,
PARQUET_EXPORT_SECRET,
} from "$env/static/private";
import { collections } from "$lib/server/database";
import type { Message } from "$lib/types/Message";
import { error } from "@sveltejs/kit";
import { pathToFileURL } from "node:url";
import { unlink } from "node:fs/promises";
import { uploadFile } from "@huggingface/hub";
import parquet from "parquetjs";
import { z } from "zod";
// Triger like this:
// curl -X POST "http://localhost:5173/chat/admin/export" -H "Authorization: Bearer <PARQUET_EXPORT_SECRET>" -H "Content-Type: application/json" -d '{"model": "OpenAssistant/oasst-sft-6-llama-30b-xor"}'
export async function POST({ request }) {
if (!PARQUET_EXPORT_SECRET || !PARQUET_EXPORT_DATASET || !PARQUET_EXPORT_HF_TOKEN) {
throw error(500, "Parquet export is not configured.");
}
if (request.headers.get("Authorization") !== `Bearer ${PARQUET_EXPORT_SECRET}`) {
throw error(403);
}
const { model } = z
.object({
model: z.string(),
})
.parse(await request.json());
const schema = new parquet.ParquetSchema({
title: { type: "UTF8" },
created_at: { type: "TIMESTAMP_MILLIS" },
updated_at: { type: "TIMESTAMP_MILLIS" },
messages: {
repeated: true,
fields: {
from: { type: "UTF8" },
content: { type: "UTF8" },
score: { type: "INT_8", optional: true },
},
},
});
const fileName = `/tmp/conversations-${new Date().toJSON().slice(0, 10)}-${Date.now()}.parquet`;
const writer = await parquet.ParquetWriter.openFile(schema, fileName);
let count = 0;
console.log("Exporting conversations for model", model);
for await (const conversation of collections.settings.aggregate<{
title: string;
created_at: Date;
updated_at: Date;
messages: Message[];
}>([
{
$match: {
shareConversationsWithModelAuthors: true,
sessionId: { $exists: true },
userId: { $exists: false },
},
},
{
$lookup: {
from: "conversations",
localField: "sessionId",
foreignField: "sessionId",
as: "conversations",
pipeline: [{ $match: { model, userId: { $exists: false } } }],
},
},
{ $unwind: "$conversations" },
{
$project: {
title: "$conversations.title",
created_at: "$conversations.createdAt",
updated_at: "$conversations.updatedAt",
messages: "$conversations.messages",
},
},
])) {
await writer.appendRow({
title: conversation.title,
created_at: conversation.created_at,
updated_at: conversation.updated_at,
messages: conversation.messages.map((message: Message) => ({
from: message.from,
content: message.content,
...(message.score ? { score: message.score } : undefined),
})),
});
++count;
if (count % 1_000 === 0) {
console.log("Exported", count, "conversations");
}
}
console.log("exporting convos with userId");
for await (const conversation of collections.settings.aggregate<{
title: string;
created_at: Date;
updated_at: Date;
messages: Message[];
}>([
{ $match: { shareConversationsWithModelAuthors: true, userId: { $exists: true } } },
{
$lookup: {
from: "conversations",
localField: "userId",
foreignField: "userId",
as: "conversations",
pipeline: [{ $match: { model } }],
},
},
{ $unwind: "$conversations" },
{
$project: {
title: "$conversations.title",
created_at: "$conversations.createdAt",
updated_at: "$conversations.updatedAt",
messages: "$conversations.messages",
},
},
])) {
await writer.appendRow({
title: conversation.title,
created_at: conversation.created_at,
updated_at: conversation.updated_at,
messages: conversation.messages.map((message: Message) => ({
from: message.from,
content: message.content,
...(message.score ? { score: message.score } : undefined),
})),
});
++count;
if (count % 1_000 === 0) {
console.log("Exported", count, "conversations");
}
}
await writer.close();
console.log("Uploading", fileName, "to Hugging Face Hub");
await uploadFile({
file: pathToFileURL(fileName),
credentials: { accessToken: PARQUET_EXPORT_HF_TOKEN },
repo: {
type: "dataset",
name: PARQUET_EXPORT_DATASET,
},
});
console.log("Upload done");
await unlink(fileName);
return new Response();
}