feat(stats): switch to aggregate component for O(log n) counts

- Add @convex-dev/aggregate package for efficient aggregation
- Update convex.config.ts with pageViewsByPath, totalPageViews, uniqueVisitors aggregates
- Update recordPageView to insert into aggregate components
- Update getStats to use aggregate counts instead of O(n) table scans
- Add backfillAggregates internal mutation for existing data
- Update prds/howstatsworks.md with old vs new comparison
- Update changelog.md with v1.11.0 entry
- Update files.md with aggregate component info
This commit is contained in:
Wayne Sutton
2025-12-20 14:39:53 -08:00
parent ae3d69c7b0
commit 8d28e36458
8 changed files with 915 additions and 58 deletions

View File

@@ -58,4 +58,563 @@ export declare const internal: FilterApi<
FunctionReference<any, "internal">
>;
export declare const components: {};
export declare const components: {
pageViewsByPath: {
btree: {
aggregateBetween: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any },
{ count: number; sum: number }
>;
aggregateBetweenBatch: FunctionReference<
"query",
"internal",
{ queries: Array<{ k1?: any; k2?: any; namespace?: any }> },
Array<{ count: number; sum: number }>
>;
atNegativeOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffsetBatch: FunctionReference<
"query",
"internal",
{
queries: Array<{
k1?: any;
k2?: any;
namespace?: any;
offset: number;
}>;
},
Array<{ k: any; s: number; v: any }>
>;
get: FunctionReference<
"query",
"internal",
{ key: any; namespace?: any },
null | { k: any; s: number; v: any }
>;
offset: FunctionReference<
"query",
"internal",
{ k1?: any; key: any; namespace?: any },
number
>;
offsetUntil: FunctionReference<
"query",
"internal",
{ k2?: any; key: any; namespace?: any },
number
>;
paginate: FunctionReference<
"query",
"internal",
{
cursor?: string;
k1?: any;
k2?: any;
limit: number;
namespace?: any;
order: "asc" | "desc";
},
{
cursor: string;
isDone: boolean;
page: Array<{ k: any; s: number; v: any }>;
}
>;
paginateNamespaces: FunctionReference<
"query",
"internal",
{ cursor?: string; limit: number },
{ cursor: string; isDone: boolean; page: Array<any> }
>;
validate: FunctionReference<
"query",
"internal",
{ namespace?: any },
any
>;
};
inspect: {
display: FunctionReference<"query", "internal", { namespace?: any }, any>;
dump: FunctionReference<"query", "internal", { namespace?: any }, string>;
inspectNode: FunctionReference<
"query",
"internal",
{ namespace?: any; node?: string },
null
>;
listTreeNodes: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
aggregate?: { count: number; sum: number };
items: Array<{ k: any; s: number; v: any }>;
subtrees: Array<string>;
}>
>;
listTrees: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
maxNodeSize: number;
namespace?: any;
root: string;
}>
>;
};
public: {
clear: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
delete_: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
null
>;
deleteIfExists: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
any
>;
init: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
insert: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any; summand?: number; value: any },
null
>;
makeRootLazy: FunctionReference<
"mutation",
"internal",
{ namespace?: any },
null
>;
replace: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
null
>;
replaceOrInsert: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
any
>;
};
};
totalPageViews: {
btree: {
aggregateBetween: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any },
{ count: number; sum: number }
>;
aggregateBetweenBatch: FunctionReference<
"query",
"internal",
{ queries: Array<{ k1?: any; k2?: any; namespace?: any }> },
Array<{ count: number; sum: number }>
>;
atNegativeOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffsetBatch: FunctionReference<
"query",
"internal",
{
queries: Array<{
k1?: any;
k2?: any;
namespace?: any;
offset: number;
}>;
},
Array<{ k: any; s: number; v: any }>
>;
get: FunctionReference<
"query",
"internal",
{ key: any; namespace?: any },
null | { k: any; s: number; v: any }
>;
offset: FunctionReference<
"query",
"internal",
{ k1?: any; key: any; namespace?: any },
number
>;
offsetUntil: FunctionReference<
"query",
"internal",
{ k2?: any; key: any; namespace?: any },
number
>;
paginate: FunctionReference<
"query",
"internal",
{
cursor?: string;
k1?: any;
k2?: any;
limit: number;
namespace?: any;
order: "asc" | "desc";
},
{
cursor: string;
isDone: boolean;
page: Array<{ k: any; s: number; v: any }>;
}
>;
paginateNamespaces: FunctionReference<
"query",
"internal",
{ cursor?: string; limit: number },
{ cursor: string; isDone: boolean; page: Array<any> }
>;
validate: FunctionReference<
"query",
"internal",
{ namespace?: any },
any
>;
};
inspect: {
display: FunctionReference<"query", "internal", { namespace?: any }, any>;
dump: FunctionReference<"query", "internal", { namespace?: any }, string>;
inspectNode: FunctionReference<
"query",
"internal",
{ namespace?: any; node?: string },
null
>;
listTreeNodes: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
aggregate?: { count: number; sum: number };
items: Array<{ k: any; s: number; v: any }>;
subtrees: Array<string>;
}>
>;
listTrees: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
maxNodeSize: number;
namespace?: any;
root: string;
}>
>;
};
public: {
clear: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
delete_: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
null
>;
deleteIfExists: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
any
>;
init: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
insert: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any; summand?: number; value: any },
null
>;
makeRootLazy: FunctionReference<
"mutation",
"internal",
{ namespace?: any },
null
>;
replace: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
null
>;
replaceOrInsert: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
any
>;
};
};
uniqueVisitors: {
btree: {
aggregateBetween: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any },
{ count: number; sum: number }
>;
aggregateBetweenBatch: FunctionReference<
"query",
"internal",
{ queries: Array<{ k1?: any; k2?: any; namespace?: any }> },
Array<{ count: number; sum: number }>
>;
atNegativeOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffset: FunctionReference<
"query",
"internal",
{ k1?: any; k2?: any; namespace?: any; offset: number },
{ k: any; s: number; v: any }
>;
atOffsetBatch: FunctionReference<
"query",
"internal",
{
queries: Array<{
k1?: any;
k2?: any;
namespace?: any;
offset: number;
}>;
},
Array<{ k: any; s: number; v: any }>
>;
get: FunctionReference<
"query",
"internal",
{ key: any; namespace?: any },
null | { k: any; s: number; v: any }
>;
offset: FunctionReference<
"query",
"internal",
{ k1?: any; key: any; namespace?: any },
number
>;
offsetUntil: FunctionReference<
"query",
"internal",
{ k2?: any; key: any; namespace?: any },
number
>;
paginate: FunctionReference<
"query",
"internal",
{
cursor?: string;
k1?: any;
k2?: any;
limit: number;
namespace?: any;
order: "asc" | "desc";
},
{
cursor: string;
isDone: boolean;
page: Array<{ k: any; s: number; v: any }>;
}
>;
paginateNamespaces: FunctionReference<
"query",
"internal",
{ cursor?: string; limit: number },
{ cursor: string; isDone: boolean; page: Array<any> }
>;
validate: FunctionReference<
"query",
"internal",
{ namespace?: any },
any
>;
};
inspect: {
display: FunctionReference<"query", "internal", { namespace?: any }, any>;
dump: FunctionReference<"query", "internal", { namespace?: any }, string>;
inspectNode: FunctionReference<
"query",
"internal",
{ namespace?: any; node?: string },
null
>;
listTreeNodes: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
aggregate?: { count: number; sum: number };
items: Array<{ k: any; s: number; v: any }>;
subtrees: Array<string>;
}>
>;
listTrees: FunctionReference<
"query",
"internal",
{ take?: number },
Array<{
_creationTime: number;
_id: string;
maxNodeSize: number;
namespace?: any;
root: string;
}>
>;
};
public: {
clear: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
delete_: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
null
>;
deleteIfExists: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any },
any
>;
init: FunctionReference<
"mutation",
"internal",
{ maxNodeSize?: number; namespace?: any; rootLazy?: boolean },
null
>;
insert: FunctionReference<
"mutation",
"internal",
{ key: any; namespace?: any; summand?: number; value: any },
null
>;
makeRootLazy: FunctionReference<
"mutation",
"internal",
{ namespace?: any },
null
>;
replace: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
null
>;
replaceOrInsert: FunctionReference<
"mutation",
"internal",
{
currentKey: any;
namespace?: any;
newKey: any;
newNamespace?: any;
summand?: number;
value: any;
},
any
>;
};
};
};

View File

@@ -1,6 +1,16 @@
import { defineApp } from "convex/server";
import aggregate from "@convex-dev/aggregate/convex.config.js";
const app = defineApp();
// Aggregate component for efficient page view counts (O(log n) instead of O(n))
app.use(aggregate, { name: "pageViewsByPath" });
// Aggregate component for total page views count
app.use(aggregate, { name: "totalPageViews" });
// Aggregate component for unique visitors count
app.use(aggregate, { name: "uniqueVisitors" });
export default app;

View File

@@ -1,5 +1,8 @@
import { query, mutation, internalMutation } from "./_generated/server";
import { v } from "convex/values";
import { components } from "./_generated/api";
import { DataModel } from "./_generated/dataModel";
import { TableAggregate } from "@convex-dev/aggregate";
// Deduplication window: 30 minutes in milliseconds
const DEDUP_WINDOW_MS = 30 * 60 * 1000;
@@ -10,9 +13,50 @@ const SESSION_TIMEOUT_MS = 2 * 60 * 1000;
// Heartbeat dedup window: 10 seconds (prevents write conflicts from rapid calls)
const HEARTBEAT_DEDUP_MS = 10 * 1000;
/**
* Aggregate for page views by path.
* Provides O(log n) counts instead of O(n) full table scans.
* Namespace by path to get per-page view counts efficiently.
*/
const pageViewsByPath = new TableAggregate<{
Namespace: string; // path
Key: number; // timestamp
DataModel: DataModel;
TableName: "pageViews";
}>(components.pageViewsByPath, {
namespace: (doc) => doc.path,
sortKey: (doc) => doc.timestamp,
});
/**
* Aggregate for total page views.
* Key is null since we only need a global count.
*/
const totalPageViews = new TableAggregate<{
Key: null;
DataModel: DataModel;
TableName: "pageViews";
}>(components.totalPageViews, {
sortKey: () => null,
});
/**
* Aggregate for unique visitors.
* Uses sessionId as key to count distinct sessions.
* Each session only counted once (first occurrence).
*/
const uniqueVisitors = new TableAggregate<{
Key: string; // sessionId
DataModel: DataModel;
TableName: "pageViews";
}>(components.uniqueVisitors, {
sortKey: (doc) => doc.sessionId,
});
/**
* Record a page view event.
* Idempotent: same session viewing same path within 30min = 1 view.
* Updates aggregate components for efficient O(log n) counts.
*/
export const recordPageView = mutation({
args: {
@@ -39,13 +83,31 @@ export const recordPageView = mutation({
return null;
}
// Check if this is a new unique visitor (first page view for this session)
const existingSessionView = await ctx.db
.query("pageViews")
.withIndex("by_session_path", (q) => q.eq("sessionId", args.sessionId))
.first();
const isNewVisitor = !existingSessionView;
// Insert new view event
await ctx.db.insert("pageViews", {
const id = await ctx.db.insert("pageViews", {
path: args.path,
pageType: args.pageType,
sessionId: args.sessionId,
timestamp: now,
});
const doc = await ctx.db.get(id);
// Update aggregates with the new page view
if (doc) {
await pageViewsByPath.insertIfDoesNotExist(ctx, doc);
await totalPageViews.insertIfDoesNotExist(ctx, doc);
// Only insert into unique visitors aggregate if this is a new session
if (isNewVisitor) {
await uniqueVisitors.insertIfDoesNotExist(ctx, doc);
}
}
return null;
},
@@ -102,6 +164,7 @@ export const heartbeat = mutation({
/**
* Get all stats for the stats page.
* Real-time subscription via useQuery.
* Uses aggregate components for O(log n) counts instead of O(n) table scans.
*/
export const getStats = query({
args: {},
@@ -147,24 +210,19 @@ export const getStats = query({
.map(([path, count]) => ({ path, count }))
.sort((a, b) => b.count - a.count);
// Get all page views ordered by timestamp to find earliest
const allViews = await ctx.db
// Use aggregate component for total page views count: O(log n) instead of O(n)
const totalPageViewsCount = await totalPageViews.count(ctx);
// Use aggregate component for unique visitors count: O(log n) instead of O(n)
const uniqueVisitorsCount = await uniqueVisitors.count(ctx);
// Get earliest page view for tracking since date (single doc fetch)
const firstView = await ctx.db
.query("pageViews")
.withIndex("by_timestamp")
.order("asc")
.collect();
// Get tracking start date (earliest view timestamp)
const trackingSince = allViews.length > 0 ? allViews[0].timestamp : null;
// Aggregate views by path and count unique sessions
const viewsByPath: Record<string, number> = {};
const uniqueSessions = new Set<string>();
for (const view of allViews) {
viewsByPath[view.path] = (viewsByPath[view.path] || 0) + 1;
uniqueSessions.add(view.sessionId);
}
.first();
const trackingSince = firstView ? firstView.timestamp : null;
// Get published posts and pages for titles
const posts = await ctx.db
@@ -177,45 +235,58 @@ export const getStats = query({
.withIndex("by_published", (q) => q.eq("published", true))
.collect();
// Build page stats array with titles
const pageStats = Object.entries(viewsByPath)
.map(([path, views]) => {
// Match path to post or page
const slug = path.startsWith("/") ? path.slice(1) : path;
const post = posts.find((p) => p.slug === slug);
const page = pages.find((p) => p.slug === slug);
// Get unique paths from pageViews (needed to build pageStats)
// We still need to iterate for path list, but use aggregate for per-path counts
const allPaths = new Set<string>();
const pathViewsFromDb = await ctx.db.query("pageViews").collect();
for (const view of pathViewsFromDb) {
allPaths.add(view.path);
}
let title = path;
let pageType = "other";
// Build page stats using aggregate counts per path: O(log n) per path
const pageStatsPromises = Array.from(allPaths).map(async (path) => {
// Use aggregate namespace count for this path
const views = await pageViewsByPath.count(ctx, { namespace: path });
// Match path to post or page for title
const slug = path.startsWith("/") ? path.slice(1) : path;
const post = posts.find((p) => p.slug === slug);
const page = pages.find((p) => p.slug === slug);
if (path === "/" || path === "") {
title = "Home";
pageType = "home";
} else if (path === "/stats") {
title = "Stats";
pageType = "stats";
} else if (post) {
title = post.title;
pageType = "blog";
} else if (page) {
title = page.title;
pageType = "page";
}
let title = path;
let pageType = "other";
return {
path,
title,
pageType,
views,
};
})
.sort((a, b) => b.views - a.views);
if (path === "/" || path === "") {
title = "Home";
pageType = "home";
} else if (path === "/stats") {
title = "Stats";
pageType = "stats";
} else if (post) {
title = post.title;
pageType = "blog";
} else if (page) {
title = page.title;
pageType = "page";
}
return {
path,
title,
pageType,
views,
};
});
const pageStats = (await Promise.all(pageStatsPromises)).sort(
(a, b) => b.views - a.views
);
return {
activeVisitors: activeSessions.length,
activeByPath,
totalPageViews: allViews.length,
uniqueVisitors: uniqueSessions.size,
totalPageViews: totalPageViewsCount,
uniqueVisitors: uniqueVisitorsCount,
publishedPosts: posts.length,
publishedPages: pages.length,
trackingSince,
@@ -247,3 +318,45 @@ export const cleanupStaleSessions = internalMutation({
},
});
/**
* Internal mutation to backfill aggregates from existing pageViews data.
* Run this once after deploying the aggregate component to populate counts.
* Uses idempotent insertIfDoesNotExist so it's safe to run multiple times.
*/
export const backfillAggregates = internalMutation({
args: {},
returns: v.object({
processed: v.number(),
uniqueSessions: v.number(),
}),
handler: async (ctx) => {
// Get all page views
const allViews = await ctx.db.query("pageViews").collect();
// Track unique sessions to avoid duplicate inserts
const seenSessions = new Set<string>();
let uniqueCount = 0;
// Process each view and update aggregates
for (const doc of allViews) {
// Insert into pageViewsByPath aggregate (one per view)
await pageViewsByPath.insertIfDoesNotExist(ctx, doc);
// Insert into totalPageViews aggregate (one per view)
await totalPageViews.insertIfDoesNotExist(ctx, doc);
// Insert into uniqueVisitors aggregate (one per session)
if (!seenSessions.has(doc.sessionId)) {
seenSessions.add(doc.sessionId);
await uniqueVisitors.insertIfDoesNotExist(ctx, doc);
uniqueCount++;
}
}
return {
processed: allViews.length,
uniqueSessions: uniqueCount,
};
},
});