mirror of
https://github.com/waynesutton/markdown-site.git
synced 2026-01-12 04:09:14 +00:00
feat: implement chunked backfilling for aggregate component
- Add backfillAggregatesChunk mutation that processes 500 records at a time - Uses pagination and ctx.scheduler.runAfter to chain batch processing - Prevents Convex 16MB memory limit issues with large datasets - Progress visible in Convex dashboard logs - Track seen session IDs across chunks for unique visitor counting - Update howstatsworks.md with chunked backfilling documentation - Add v1.11.1 changelog entries
This commit is contained in:
112
convex/stats.ts
112
convex/stats.ts
@@ -319,33 +319,44 @@ export const cleanupStaleSessions = internalMutation({
|
||||
},
|
||||
});
|
||||
|
||||
// Batch size for chunked backfilling (keeps memory usage under 16MB limit)
|
||||
const BACKFILL_BATCH_SIZE = 500;
|
||||
|
||||
/**
|
||||
* Internal mutation to backfill aggregates from existing pageViews data.
|
||||
* Run this once after deploying the aggregate component to populate counts.
|
||||
* Uses idempotent insertIfDoesNotExist so it's safe to run multiple times.
|
||||
* Internal mutation to backfill aggregates in chunks.
|
||||
* Processes BACKFILL_BATCH_SIZE records at a time to avoid memory limits.
|
||||
* Schedules itself to continue with the next batch until complete.
|
||||
*/
|
||||
export const backfillAggregates = internalMutation({
|
||||
args: {},
|
||||
export const backfillAggregatesChunk = internalMutation({
|
||||
args: {
|
||||
cursor: v.union(v.string(), v.null()),
|
||||
totalProcessed: v.number(),
|
||||
seenSessionIds: v.array(v.string()),
|
||||
},
|
||||
returns: v.object({
|
||||
status: v.union(v.literal("in_progress"), v.literal("complete")),
|
||||
processed: v.number(),
|
||||
uniqueSessions: v.number(),
|
||||
cursor: v.union(v.string(), v.null()),
|
||||
}),
|
||||
handler: async (ctx) => {
|
||||
// Get all page views
|
||||
const allViews = await ctx.db.query("pageViews").collect();
|
||||
|
||||
// Track unique sessions to avoid duplicate inserts
|
||||
const seenSessions = new Set<string>();
|
||||
handler: async (ctx, args) => {
|
||||
// Paginate through pageViews in batches
|
||||
const result = await ctx.db
|
||||
.query("pageViews")
|
||||
.paginate({ numItems: BACKFILL_BATCH_SIZE, cursor: args.cursor });
|
||||
|
||||
// Track unique sessions (restore from previous chunks)
|
||||
const seenSessions = new Set<string>(args.seenSessionIds);
|
||||
let uniqueCount = 0;
|
||||
|
||||
// Process each view and update aggregates
|
||||
for (const doc of allViews) {
|
||||
|
||||
// Process each view in this batch
|
||||
for (const doc of result.page) {
|
||||
// Insert into pageViewsByPath aggregate (one per view)
|
||||
await pageViewsByPath.insertIfDoesNotExist(ctx, doc);
|
||||
|
||||
|
||||
// Insert into totalPageViews aggregate (one per view)
|
||||
await totalPageViews.insertIfDoesNotExist(ctx, doc);
|
||||
|
||||
|
||||
// Insert into uniqueVisitors aggregate (one per session)
|
||||
if (!seenSessions.has(doc.sessionId)) {
|
||||
seenSessions.add(doc.sessionId);
|
||||
@@ -353,11 +364,74 @@ export const backfillAggregates = internalMutation({
|
||||
uniqueCount++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const newTotalProcessed = args.totalProcessed + result.page.length;
|
||||
|
||||
// If there are more records, schedule the next chunk
|
||||
if (!result.isDone) {
|
||||
// Convert Set to array for passing to next chunk (limited to prevent arg size issues)
|
||||
// Only keep the last 10000 session IDs to prevent argument size explosion
|
||||
const sessionArray = Array.from(seenSessions).slice(-10000);
|
||||
|
||||
await ctx.scheduler.runAfter(
|
||||
0,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(await import("./_generated/api")).internal.stats.backfillAggregatesChunk as any,
|
||||
{
|
||||
cursor: result.continueCursor,
|
||||
totalProcessed: newTotalProcessed,
|
||||
seenSessionIds: sessionArray,
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
status: "in_progress" as const,
|
||||
processed: newTotalProcessed,
|
||||
uniqueSessions: seenSessions.size,
|
||||
cursor: result.continueCursor,
|
||||
};
|
||||
}
|
||||
|
||||
// Backfilling complete
|
||||
return {
|
||||
processed: allViews.length,
|
||||
uniqueSessions: uniqueCount,
|
||||
status: "complete" as const,
|
||||
processed: newTotalProcessed,
|
||||
uniqueSessions: seenSessions.size,
|
||||
cursor: null,
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
/**
|
||||
* Start backfilling aggregates from existing pageViews data.
|
||||
* This kicks off the chunked backfill process.
|
||||
* Safe to call multiple times (uses insertIfDoesNotExist).
|
||||
*/
|
||||
export const backfillAggregates = internalMutation({
|
||||
args: {},
|
||||
returns: v.object({
|
||||
message: v.string(),
|
||||
}),
|
||||
handler: async (ctx) => {
|
||||
// Check if there are any pageViews to backfill
|
||||
const firstView = await ctx.db.query("pageViews").first();
|
||||
if (!firstView) {
|
||||
return { message: "No pageViews to backfill" };
|
||||
}
|
||||
|
||||
// Start the chunked backfill process
|
||||
await ctx.scheduler.runAfter(
|
||||
0,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(await import("./_generated/api")).internal.stats.backfillAggregatesChunk as any,
|
||||
{
|
||||
cursor: null,
|
||||
totalProcessed: 0,
|
||||
seenSessionIds: [],
|
||||
}
|
||||
);
|
||||
|
||||
return { message: "Backfill started. Check logs for progress." };
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user