coyotte508 HF staff nsarrazin HF staff commited on
Commit
634bd69
1 Parent(s): 6244c37

✨ Add stats on conversations (#828)

Browse files

* ✨ Add stats on conversations

* 🩹 Fix conversation stats compute

* ⚡️ Slightly better index

* 🔊

* 🩹 Some fixes

* 🐛 Fix aggregation query

* 🐛 Fix $merge stage of query

* 🐛 Fix query on sessionId

* ✨ Compute weekly/monthly unique users

* 🩹 Final tweaks

* ⚡️ Split aggregations for better perf

No need to recompute DAUs / WAUs and so on

* ♻️ Deprecate PARQUET_EXPORT_SECRET & fix sec vuln

* Add ADMIN_API_SECRET to CD action

---------

Co-authored-by: Nathan Sarrazin <sarrazin.nathan@gmail.com>

.env CHANGED
@@ -109,7 +109,9 @@ PUBLIC_ANNOUNCEMENT_BANNERS=`[
109
 
110
  PARQUET_EXPORT_DATASET=
111
  PARQUET_EXPORT_HF_TOKEN=
112
- PARQUET_EXPORT_SECRET=
 
 
113
 
114
  RATE_LIMIT= # requests per minute
115
  MESSAGES_BEFORE_LOGIN=# how many messages a user can send in a conversation before having to login. set to 0 to force login right away
 
109
 
110
  PARQUET_EXPORT_DATASET=
111
  PARQUET_EXPORT_HF_TOKEN=
112
+ ADMIN_API_SECRET=# secret to admin API calls, like computing usage stats or exporting parquet data
113
+
114
+ PARQUET_EXPORT_SECRET=#DEPRECATED, use ADMIN_API_SECRET instead
115
 
116
  RATE_LIMIT= # requests per minute
117
  MESSAGES_BEFORE_LOGIN=# how many messages a user can send in a conversation before having to login. set to 0 to force login right away
.github/workflows/deploy-release.yml CHANGED
@@ -26,6 +26,7 @@ jobs:
26
  MONGODB_URL: ${{ secrets.MONGODB_URL }}
27
  HF_DEPLOYMENT_TOKEN: ${{ secrets.HF_DEPLOYMENT_TOKEN }}
28
  WEBHOOK_URL_REPORT_ASSISTANT: ${{ secrets.WEBHOOK_URL_REPORT_ASSISTANT }}
 
29
  run: npm run updateProdEnv
30
  sync-to-hub:
31
  runs-on: ubuntu-latest
 
26
  MONGODB_URL: ${{ secrets.MONGODB_URL }}
27
  HF_DEPLOYMENT_TOKEN: ${{ secrets.HF_DEPLOYMENT_TOKEN }}
28
  WEBHOOK_URL_REPORT_ASSISTANT: ${{ secrets.WEBHOOK_URL_REPORT_ASSISTANT }}
29
+ ADMIN_API_SECRET: ${{ secrets.ADMIN_API_SECRET }}
30
  run: npm run updateProdEnv
31
  sync-to-hub:
32
  runs-on: ubuntu-latest
package-lock.json CHANGED
@@ -4093,10 +4093,17 @@
4093
  "resolved": "https://registry.npmjs.org/int53/-/int53-0.2.4.tgz",
4094
  "integrity": "sha512-a5jlKftS7HUOhkUyYD7j2sJ/ZnvWiNlZS1ldR+g1ifQ+/UuZXIE+YTc/lK1qGj/GwAU5F8Z0e1eVq2t1J5Ob2g=="
4095
  },
4096
- "node_modules/ip": {
4097
- "version": "2.0.0",
4098
- "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz",
4099
- "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ=="
 
 
 
 
 
 
 
4100
  },
4101
  "node_modules/is-arrayish": {
4102
  "version": "0.3.2",
@@ -4267,6 +4274,11 @@
4267
  "js-yaml": "bin/js-yaml.js"
4268
  }
4269
  },
 
 
 
 
 
4270
  "node_modules/jsdom": {
4271
  "version": "22.0.0",
4272
  "resolved": "https://registry.npmjs.org/jsdom/-/jsdom-22.0.0.tgz",
@@ -6294,15 +6306,15 @@
6294
  "integrity": "sha512-YIK6I2lsH072UE0aOFxxY1dPDCS43I5ktqHpeAsuLNYWkE5pGxRGWfDM4/vSUfNzXjC1Ivzt3qx31PCLmc9yqg=="
6295
  },
6296
  "node_modules/socks": {
6297
- "version": "2.7.1",
6298
- "resolved": "https://registry.npmjs.org/socks/-/socks-2.7.1.tgz",
6299
- "integrity": "sha512-7maUZy1N7uo6+WVEX6psASxtNlKaNVMlGQKkG/63nEDdLOWNbiUMoLK7X4uYoLhQstau72mLgfEWcXcwsaHbYQ==",
6300
  "dependencies": {
6301
- "ip": "^2.0.0",
6302
  "smart-buffer": "^4.2.0"
6303
  },
6304
  "engines": {
6305
- "node": ">= 10.13.0",
6306
  "npm": ">= 3.0.0"
6307
  }
6308
  },
@@ -6345,6 +6357,11 @@
6345
  "memory-pager": "^1.0.2"
6346
  }
6347
  },
 
 
 
 
 
6348
  "node_modules/stackback": {
6349
  "version": "0.0.2",
6350
  "resolved": "https://registry.npmjs.org/stackback/-/stackback-0.0.2.tgz",
 
4093
  "resolved": "https://registry.npmjs.org/int53/-/int53-0.2.4.tgz",
4094
  "integrity": "sha512-a5jlKftS7HUOhkUyYD7j2sJ/ZnvWiNlZS1ldR+g1ifQ+/UuZXIE+YTc/lK1qGj/GwAU5F8Z0e1eVq2t1J5Ob2g=="
4095
  },
4096
+ "node_modules/ip-address": {
4097
+ "version": "9.0.5",
4098
+ "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz",
4099
+ "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==",
4100
+ "dependencies": {
4101
+ "jsbn": "1.1.0",
4102
+ "sprintf-js": "^1.1.3"
4103
+ },
4104
+ "engines": {
4105
+ "node": ">= 12"
4106
+ }
4107
  },
4108
  "node_modules/is-arrayish": {
4109
  "version": "0.3.2",
 
4274
  "js-yaml": "bin/js-yaml.js"
4275
  }
4276
  },
4277
+ "node_modules/jsbn": {
4278
+ "version": "1.1.0",
4279
+ "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz",
4280
+ "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A=="
4281
+ },
4282
  "node_modules/jsdom": {
4283
  "version": "22.0.0",
4284
  "resolved": "https://registry.npmjs.org/jsdom/-/jsdom-22.0.0.tgz",
 
6306
  "integrity": "sha512-YIK6I2lsH072UE0aOFxxY1dPDCS43I5ktqHpeAsuLNYWkE5pGxRGWfDM4/vSUfNzXjC1Ivzt3qx31PCLmc9yqg=="
6307
  },
6308
  "node_modules/socks": {
6309
+ "version": "2.7.3",
6310
+ "resolved": "https://registry.npmjs.org/socks/-/socks-2.7.3.tgz",
6311
+ "integrity": "sha512-vfuYK48HXCTFD03G/1/zkIls3Ebr2YNa4qU9gHDZdblHLiqhJrJGkY3+0Nx0JpN9qBhJbVObc1CNciT1bIZJxw==",
6312
  "dependencies": {
6313
+ "ip-address": "^9.0.5",
6314
  "smart-buffer": "^4.2.0"
6315
  },
6316
  "engines": {
6317
+ "node": ">= 10.0.0",
6318
  "npm": ">= 3.0.0"
6319
  }
6320
  },
 
6357
  "memory-pager": "^1.0.2"
6358
  }
6359
  },
6360
+ "node_modules/sprintf-js": {
6361
+ "version": "1.1.3",
6362
+ "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz",
6363
+ "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA=="
6364
+ },
6365
  "node_modules/stackback": {
6366
  "version": "0.0.2",
6367
  "resolved": "https://registry.npmjs.org/stackback/-/stackback-0.0.2.tgz",
scripts/updateProdEnv.ts CHANGED
@@ -7,6 +7,7 @@ const OPENID_CONFIG = process.env.OPENID_CONFIG;
7
  const MONGODB_URL = process.env.MONGODB_URL;
8
  const HF_TOKEN = process.env.HF_TOKEN ?? process.env.HF_ACCESS_TOKEN; // token used for API requests in prod
9
  const WEBHOOK_URL_REPORT_ASSISTANT = process.env.WEBHOOK_URL_REPORT_ASSISTANT; // slack webhook url used to get "report assistant" events
 
10
 
11
  // Read the content of the file .env.template
12
  const PUBLIC_CONFIG = fs.readFileSync(".env.template", "utf8");
@@ -18,6 +19,7 @@ OPENID_CONFIG=${OPENID_CONFIG}
18
  SERPER_API_KEY=${SERPER_API_KEY}
19
  HF_TOKEN=${HF_TOKEN}
20
  WEBHOOK_URL_REPORT_ASSISTANT=${WEBHOOK_URL_REPORT_ASSISTANT}
 
21
  `;
22
 
23
  // Make an HTTP POST request to add the space secrets
 
7
  const MONGODB_URL = process.env.MONGODB_URL;
8
  const HF_TOKEN = process.env.HF_TOKEN ?? process.env.HF_ACCESS_TOKEN; // token used for API requests in prod
9
  const WEBHOOK_URL_REPORT_ASSISTANT = process.env.WEBHOOK_URL_REPORT_ASSISTANT; // slack webhook url used to get "report assistant" events
10
+ const ADMIN_API_SECRET = process.env.ADMIN_API_SECRET;
11
 
12
  // Read the content of the file .env.template
13
  const PUBLIC_CONFIG = fs.readFileSync(".env.template", "utf8");
 
19
  SERPER_API_KEY=${SERPER_API_KEY}
20
  HF_TOKEN=${HF_TOKEN}
21
  WEBHOOK_URL_REPORT_ASSISTANT=${WEBHOOK_URL_REPORT_ASSISTANT}
22
+ ADMIN_API_SECRET=${ADMIN_API_SECRET}
23
  `;
24
 
25
  // Make an HTTP POST request to add the space secrets
src/hooks.server.ts CHANGED
@@ -1,4 +1,10 @@
1
- import { COOKIE_NAME, EXPOSE_API, MESSAGES_BEFORE_LOGIN } from "$env/static/private";
 
 
 
 
 
 
2
  import type { Handle } from "@sveltejs/kit";
3
  import {
4
  PUBLIC_GOOGLE_ANALYTICS_ID,
@@ -29,6 +35,18 @@ export const handle: Handle = async ({ event, resolve }) => {
29
  });
30
  }
31
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  const token = event.cookies.get(COOKIE_NAME);
33
 
34
  let secretSessionId: string;
 
1
+ import {
2
+ ADMIN_API_SECRET,
3
+ COOKIE_NAME,
4
+ EXPOSE_API,
5
+ MESSAGES_BEFORE_LOGIN,
6
+ PARQUET_EXPORT_SECRET,
7
+ } from "$env/static/private";
8
  import type { Handle } from "@sveltejs/kit";
9
  import {
10
  PUBLIC_GOOGLE_ANALYTICS_ID,
 
35
  });
36
  }
37
 
38
+ if (event.url.pathname.startsWith(`${base}/admin/`) || event.url.pathname === `${base}/admin`) {
39
+ const ADMIN_SECRET = ADMIN_API_SECRET || PARQUET_EXPORT_SECRET;
40
+
41
+ if (!ADMIN_SECRET) {
42
+ return errorResponse(500, "Admin API is not configured");
43
+ }
44
+
45
+ if (event.request.headers.get("Authorization") !== `Bearer ${ADMIN_SECRET}`) {
46
+ return errorResponse(401, "Unauthorized");
47
+ }
48
+ }
49
+
50
  const token = event.cookies.get(COOKIE_NAME);
51
 
52
  let secretSessionId: string;
src/lib/server/database.ts CHANGED
@@ -9,6 +9,7 @@ import type { MessageEvent } from "$lib/types/MessageEvent";
9
  import type { Session } from "$lib/types/Session";
10
  import type { Assistant } from "$lib/types/Assistant";
11
  import type { Report } from "$lib/types/Report";
 
12
 
13
  if (!MONGODB_URL) {
14
  throw new Error(
@@ -24,7 +25,10 @@ export const connectPromise = client.connect().catch(console.error);
24
 
25
  const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));
26
 
 
 
27
  const conversations = db.collection<Conversation>("conversations");
 
28
  const assistants = db.collection<Assistant>("assistants");
29
  const reports = db.collection<Report>("reports");
30
  const sharedConversations = db.collection<SharedConversation>("sharedConversations");
@@ -38,6 +42,7 @@ const bucket = new GridFSBucket(db, { bucketName: "files" });
38
  export { client, db };
39
  export const collections = {
40
  conversations,
 
41
  assistants,
42
  reports,
43
  sharedConversations,
@@ -68,6 +73,33 @@ client.on("open", () => {
68
  { partialFilterExpression: { userId: { $exists: true } } }
69
  )
70
  .catch(console.error);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  abortedGenerations.createIndex({ updatedAt: 1 }, { expireAfterSeconds: 30 }).catch(console.error);
72
  abortedGenerations.createIndex({ conversationId: 1 }, { unique: true }).catch(console.error);
73
  sharedConversations.createIndex({ hash: 1 }, { unique: true }).catch(console.error);
 
9
  import type { Session } from "$lib/types/Session";
10
  import type { Assistant } from "$lib/types/Assistant";
11
  import type { Report } from "$lib/types/Report";
12
+ import type { ConversationStats } from "$lib/types/ConversationStats";
13
 
14
  if (!MONGODB_URL) {
15
  throw new Error(
 
25
 
26
  const db = client.db(MONGODB_DB_NAME + (import.meta.env.MODE === "test" ? "-test" : ""));
27
 
28
+ export const CONVERSATION_STATS_COLLECTION = "conversations.stats";
29
+
30
  const conversations = db.collection<Conversation>("conversations");
31
+ const conversationStats = db.collection<ConversationStats>(CONVERSATION_STATS_COLLECTION);
32
  const assistants = db.collection<Assistant>("assistants");
33
  const reports = db.collection<Report>("reports");
34
  const sharedConversations = db.collection<SharedConversation>("sharedConversations");
 
42
  export { client, db };
43
  export const collections = {
44
  conversations,
45
+ conversationStats,
46
  assistants,
47
  reports,
48
  sharedConversations,
 
73
  { partialFilterExpression: { userId: { $exists: true } } }
74
  )
75
  .catch(console.error);
76
+ // To do stats on conversations
77
+ conversations.createIndex({ updatedAt: 1 }).catch(console.error);
78
+ // Not strictly necessary, could use _id, but more convenient. Also for stats
79
+ conversations.createIndex({ createdAt: 1 }).catch(console.error);
80
+ // To do stats on conversation messages
81
+ conversations.createIndex({ "messages.createdAt": 1 }, { sparse: true }).catch(console.error);
82
+ // Unique index for stats
83
+ conversationStats
84
+ .createIndex(
85
+ {
86
+ type: 1,
87
+ "date.field": 1,
88
+ "date.span": 1,
89
+ "date.at": 1,
90
+ distinct: 1,
91
+ },
92
+ { unique: true }
93
+ )
94
+ .catch(console.error);
95
+ // Allow easy check of last computed stat for given type/dateField
96
+ conversationStats
97
+ .createIndex({
98
+ type: 1,
99
+ "date.field": 1,
100
+ "date.at": 1,
101
+ })
102
+ .catch(console.error);
103
  abortedGenerations.createIndex({ updatedAt: 1 }, { expireAfterSeconds: 30 }).catch(console.error);
104
  abortedGenerations.createIndex({ conversationId: 1 }, { unique: true }).catch(console.error);
105
  sharedConversations.createIndex({ hash: 1 }, { unique: true }).catch(console.error);
src/lib/types/ConversationStats.ts ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import type { Timestamps } from "./Timestamps";
2
+
3
+ export interface ConversationStats extends Timestamps {
4
+ date: {
5
+ at: Date;
6
+ span: "day" | "week" | "month";
7
+ field: "updatedAt" | "createdAt";
8
+ };
9
+ type: "conversation" | "message";
10
+ /** _id => number of conversations/messages in the month */
11
+ distinct: "sessionId" | "userId" | "userOrSessionId" | "_id";
12
+ count: number;
13
+ }
src/routes/admin/export/+server.ts CHANGED
@@ -1,8 +1,4 @@
1
- import {
2
- PARQUET_EXPORT_DATASET,
3
- PARQUET_EXPORT_HF_TOKEN,
4
- PARQUET_EXPORT_SECRET,
5
- } from "$env/static/private";
6
  import { collections } from "$lib/server/database";
7
  import type { Message } from "$lib/types/Message";
8
  import { error } from "@sveltejs/kit";
@@ -13,17 +9,13 @@ import parquet from "parquetjs";
13
  import { z } from "zod";
14
 
15
  // Triger like this:
16
- // curl -X POST "http://localhost:5173/chat/admin/export" -H "Authorization: Bearer <PARQUET_EXPORT_SECRET>" -H "Content-Type: application/json" -d '{"model": "OpenAssistant/oasst-sft-6-llama-30b-xor"}'
17
 
18
  export async function POST({ request }) {
19
- if (!PARQUET_EXPORT_SECRET || !PARQUET_EXPORT_DATASET || !PARQUET_EXPORT_HF_TOKEN) {
20
  throw error(500, "Parquet export is not configured.");
21
  }
22
 
23
- if (request.headers.get("Authorization") !== `Bearer ${PARQUET_EXPORT_SECRET}`) {
24
- throw error(403);
25
- }
26
-
27
  const { model } = z
28
  .object({
29
  model: z.string(),
 
1
+ import { PARQUET_EXPORT_DATASET, PARQUET_EXPORT_HF_TOKEN } from "$env/static/private";
 
 
 
 
2
  import { collections } from "$lib/server/database";
3
  import type { Message } from "$lib/types/Message";
4
  import { error } from "@sveltejs/kit";
 
9
  import { z } from "zod";
10
 
11
  // Triger like this:
12
+ // curl -X POST "http://localhost:5173/chat/admin/export" -H "Authorization: Bearer <ADMIN_API_SECRET>" -H "Content-Type: application/json" -d '{"model": "OpenAssistant/oasst-sft-6-llama-30b-xor"}'
13
 
14
  export async function POST({ request }) {
15
+ if (!PARQUET_EXPORT_DATASET || !PARQUET_EXPORT_HF_TOKEN) {
16
  throw error(500, "Parquet export is not configured.");
17
  }
18
 
 
 
 
 
19
  const { model } = z
20
  .object({
21
  model: z.string(),
src/routes/admin/stats/compute/+server.ts ADDED
@@ -0,0 +1,217 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { json } from "@sveltejs/kit";
2
+ import type { ConversationStats } from "$lib/types/ConversationStats";
3
+ import { CONVERSATION_STATS_COLLECTION, collections } from "$lib/server/database.js";
4
+
5
+ // Triger like this:
6
+ // curl -X POST "http://localhost:5173/chat/admin/stats/compute" -H "Authorization: Bearer <ADMIN_API_SECRET>"
7
+
8
+ export async function POST() {
9
+ for (const span of ["day", "week", "month"] as const) {
10
+ computeStats({ dateField: "updatedAt", type: "conversation", span }).catch(console.error);
11
+ computeStats({ dateField: "createdAt", type: "conversation", span }).catch(console.error);
12
+ computeStats({ dateField: "createdAt", type: "message", span }).catch(console.error);
13
+ }
14
+
15
+ return json({}, { status: 202 });
16
+ }
17
+
18
+ async function computeStats(params: {
19
+ dateField: ConversationStats["date"]["field"];
20
+ span: ConversationStats["date"]["span"];
21
+ type: ConversationStats["type"];
22
+ }) {
23
+ const lastComputed = await collections.conversationStats.findOne(
24
+ { "date.field": params.dateField, "date.span": params.span, type: params.type },
25
+ { sort: { "date.at": -1 } }
26
+ );
27
+
28
+ // If the last computed week is at the beginning of the last computed month, we need to include some days from the previous month
29
+ // In those cases we need to compute the stats from before the last month as everything is one aggregation
30
+ const minDate = lastComputed ? lastComputed.date.at : new Date(0);
31
+
32
+ console.log("Computing stats for", params.type, params.span, params.dateField, "from", minDate);
33
+
34
+ const dateField = params.type === "message" ? "messages." + params.dateField : params.dateField;
35
+
36
+ const pipeline = [
37
+ {
38
+ $match: {
39
+ [dateField]: { $gte: minDate },
40
+ },
41
+ },
42
+ {
43
+ $project: {
44
+ [dateField]: 1,
45
+ sessionId: 1,
46
+ userId: 1,
47
+ },
48
+ },
49
+ ...(params.type === "message"
50
+ ? [
51
+ {
52
+ $unwind: "$messages",
53
+ },
54
+ {
55
+ $match: {
56
+ [dateField]: { $gte: minDate },
57
+ },
58
+ },
59
+ ]
60
+ : []),
61
+ {
62
+ $sort: {
63
+ [dateField]: 1,
64
+ },
65
+ },
66
+ {
67
+ $facet: {
68
+ userId: [
69
+ {
70
+ $match: {
71
+ userId: { $exists: true },
72
+ },
73
+ },
74
+ {
75
+ $group: {
76
+ _id: {
77
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
78
+ userId: "$userId",
79
+ },
80
+ },
81
+ },
82
+ {
83
+ $group: {
84
+ _id: "$_id.at",
85
+ count: { $sum: 1 },
86
+ },
87
+ },
88
+ {
89
+ $project: {
90
+ _id: 0,
91
+ date: {
92
+ at: "$_id",
93
+ field: params.dateField,
94
+ span: params.span,
95
+ },
96
+ distinct: "userId",
97
+ count: 1,
98
+ },
99
+ },
100
+ ],
101
+ sessionId: [
102
+ {
103
+ $match: {
104
+ sessionId: { $exists: true },
105
+ },
106
+ },
107
+ {
108
+ $group: {
109
+ _id: {
110
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
111
+ sessionId: "$sessionId",
112
+ },
113
+ },
114
+ },
115
+ {
116
+ $group: {
117
+ _id: "$_id.at",
118
+ count: { $sum: 1 },
119
+ },
120
+ },
121
+ {
122
+ $project: {
123
+ _id: 0,
124
+ date: {
125
+ at: "$_id",
126
+ field: params.dateField,
127
+ span: params.span,
128
+ },
129
+ distinct: "sessionId",
130
+ count: 1,
131
+ },
132
+ },
133
+ ],
134
+ userOrSessionId: [
135
+ {
136
+ $group: {
137
+ _id: {
138
+ at: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
139
+ userOrSessionId: { $ifNull: ["$userId", "$sessionId"] },
140
+ },
141
+ },
142
+ },
143
+ {
144
+ $group: {
145
+ _id: "$_id.at",
146
+ count: { $sum: 1 },
147
+ },
148
+ },
149
+ {
150
+ $project: {
151
+ _id: 0,
152
+ date: {
153
+ at: "$_id",
154
+ field: params.dateField,
155
+ span: params.span,
156
+ },
157
+ distinct: "userOrSessionId",
158
+ count: 1,
159
+ },
160
+ },
161
+ ],
162
+ _id: [
163
+ {
164
+ $group: {
165
+ _id: { $dateTrunc: { date: `$${dateField}`, unit: params.span } },
166
+ count: { $sum: 1 },
167
+ },
168
+ },
169
+ {
170
+ $project: {
171
+ _id: 0,
172
+ date: {
173
+ at: "$_id",
174
+ field: params.dateField,
175
+ span: params.span,
176
+ },
177
+ distinct: "_id",
178
+ count: 1,
179
+ },
180
+ },
181
+ ],
182
+ },
183
+ },
184
+ {
185
+ $project: {
186
+ stats: {
187
+ $concatArrays: ["$userId", "$sessionId", "$userOrSessionId", "$_id"],
188
+ },
189
+ },
190
+ },
191
+ {
192
+ $unwind: "$stats",
193
+ },
194
+ {
195
+ $replaceRoot: {
196
+ newRoot: "$stats",
197
+ },
198
+ },
199
+ {
200
+ $set: {
201
+ type: params.type,
202
+ },
203
+ },
204
+ {
205
+ $merge: {
206
+ into: CONVERSATION_STATS_COLLECTION,
207
+ on: ["date.at", "type", "date.span", "date.field", "distinct"],
208
+ whenMatched: "replace",
209
+ whenNotMatched: "insert",
210
+ },
211
+ },
212
+ ];
213
+
214
+ await collections.conversations.aggregate(pipeline, { allowDiskUse: true }).next();
215
+
216
+ console.log("Computed stats for", params.type, params.span, params.dateField);
217
+ }