From c44d3228c6d4d17ac348d060446ba1522aaabe01 Mon Sep 17 00:00:00 2001 From: Andreas Knuth Date: Thu, 30 Apr 2026 09:50:18 -0500 Subject: [PATCH] SES volume --- backend/src/routes/billing.ts | 64 ++-- backend/src/services/ses-events.ts | 206 ++++++++++--- frontend/src/components/BillingModal.jsx | 369 +++++++++++++++++------ frontend/src/services/api.js | 26 +- 4 files changed, 485 insertions(+), 180 deletions(-) diff --git a/backend/src/routes/billing.ts b/backend/src/routes/billing.ts index 446669b..409aacf 100644 --- a/backend/src/routes/billing.ts +++ b/backend/src/routes/billing.ts @@ -1,29 +1,23 @@ import { Router } from 'express'; import { requireAuth, requireSuperAdmin } from '../middleware/auth.js'; import { computeMonthlyBilling, listBillingEvents, PRICE_PER_INBOX } from '../services/billing.js'; -import { getDomainVolumeForMonth, currentYm } from '../services/ses-events.js'; +import { + getDomainVolumeForMonth, + getVolumeOverview, + currentYm, + previousYm, +} from '../services/ses-events.js'; export const billingRouter = Router(); billingRouter.use(requireAuth); billingRouter.use(requireSuperAdmin); -/** - * GET /api/billing/summary?domain=foo.com - * Inbox-count based monthly summary ($5 per inbox per month). - */ billingRouter.get('/summary', async (req, res) => { const domain = req.query.domain ? String(req.query.domain).toLowerCase() : undefined; const months = await computeMonthlyBilling({ domain }); - res.json({ - price_per_inbox: PRICE_PER_INBOX, - months, - }); + res.json({ price_per_inbox: PRICE_PER_INBOX, months }); }); -/** - * GET /api/billing/events?domain=foo.com - * Raw mailbox lifecycle event log (created/deleted). - */ billingRouter.get('/events', async (req, res) => { const events = await listBillingEvents({ domain: req.query.domain ? String(req.query.domain).toLowerCase() : undefined, @@ -36,13 +30,7 @@ billingRouter.get('/events', async (req, res) => { /** * GET /api/billing/volume?domain=foo.com&ym=2026-04 - * SES outbound volume for a domain in a specific month. - * - * Returns per-inbox + domain totals with send count, total bytes, - * bounce count and complaint count. - * - * Defaults to the current month if ym is omitted. - * Domain is required because volume is always per-domain. + * Per-inbox drilldown for a single domain. */ billingRouter.get('/volume', async (req, res) => { const domain = req.query.domain ? String(req.query.domain).toLowerCase() : ''; @@ -51,16 +39,38 @@ billingRouter.get('/volume', async (req, res) => { return; } - const ym = req.query.ym - ? String(req.query.ym) - : currentYm(); - - // Validate ym format + const ym = req.query.ym ? String(req.query.ym) : currentYm(); if (!/^\d{4}-\d{2}$/.test(ym)) { res.status(400).json({ error: 'ym must be in YYYY-MM format' }); return; } - const volume = await getDomainVolumeForMonth(domain, ym); - res.json(volume); + res.json(await getDomainVolumeForMonth(domain, ym)); +}); + +/** + * GET /api/billing/volume-overview?ym=2026-04 + * Cross-domain overview for super admin. One DynamoDB scan, aggregates + * per domain. ym defaults to the current month. + * + * Returns: + * { + * ym: 'YYYY-MM', + * total_send_count, total_bounce_count, total_complaint_count, total_inbox_count, + * rows: [{ domain, send_count, bytes_total, bounce_count, complaint_count, inbox_count }, ...] + * } + */ +billingRouter.get('/volume-overview', async (req, res) => { + // Limit to current/previous month per product decision — no point in + // accepting arbitrary ym values from the client right now. + const ym = req.query.ym ? String(req.query.ym) : currentYm(); + const allowed = new Set([currentYm(), previousYm()]); + if (!allowed.has(ym)) { + res.status(400).json({ + error: `ym must be one of: ${[...allowed].join(', ')}`, + }); + return; + } + + res.json(await getVolumeOverview(ym)); }); diff --git a/backend/src/services/ses-events.ts b/backend/src/services/ses-events.ts index aa16839..c83aa25 100644 --- a/backend/src/services/ses-events.ts +++ b/backend/src/services/ses-events.ts @@ -1,5 +1,5 @@ import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; -import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb'; +import { DynamoDBDocumentClient, QueryCommand, ScanCommand } from '@aws-sdk/lib-dynamodb'; import { config } from '../config.js'; import { pool } from '../db.js'; @@ -13,7 +13,7 @@ const doc = DynamoDBDocumentClient.from( export interface InboxVolume { email: string; domain: string; - ym: string; // 'YYYY-MM' + ym: string; send_count: number; bytes_total: number; bounce_count: number; @@ -31,15 +31,27 @@ export interface DomainVolume { } interface RawEventRow { + pk?: string; event_type: 'send' | 'bounce' | 'complaint'; size_bytes?: number; } -/** - * Query all events for a single (email, ym) bucket and aggregate them - * into counts. This is the lowest-level helper used by the higher-level - * domain aggregator below. - */ +export function currentYm(): string { + const now = new Date(); + return `${now.getUTCFullYear()}-${String(now.getUTCMonth() + 1).padStart(2, '0')}`; +} + +export function previousYm(): string { + const now = new Date(); + // Construct first day of previous month in UTC, then format. + const d = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() - 1, 1)); + return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`; +} + +// ============================================================ +// Single-domain volume (used for the per-inbox drilldown) +// ============================================================ + async function aggregateInbox(domain: string, email: string, ym: string): Promise { const pk = `${domain}#${email}#${ym}`; @@ -49,16 +61,12 @@ async function aggregateInbox(domain: string, email: string, ym: string): Promis let bounce_count = 0; let complaint_count = 0; - // DynamoDB Query is paginated — loop until done. For a single - // mailbox-month the result set is normally small (< few thousand) - // so this loops at most a couple of times. do { const resp = await doc.send(new QueryCommand({ TableName: TABLE_NAME, KeyConditionExpression: 'pk = :pk', ExpressionAttributeValues: { ':pk': pk }, ExclusiveStartKey: lastEvaluatedKey, - // We only need a few fields for aggregation, not the full row. ProjectionExpression: 'event_type, size_bytes', })); @@ -79,30 +87,25 @@ async function aggregateInbox(domain: string, email: string, ym: string): Promis return { email, domain, ym, send_count, bytes_total, bounce_count, complaint_count }; } -/** - * Get the full domain volume for a given month. - * - * Looks up all mailboxes (active + deleted) that ever existed in the - * domain — we use the mailboxes table for that, which is cheap. Then - * for each mailbox we aggregate the events. Mailboxes with zero events - * are omitted from the per_inbox list (but they're free anyway). - */ export async function getDomainVolumeForMonth(domain: string, ym: string): Promise { const d = domain.toLowerCase(); - // Pull every mailbox that ever existed for this domain. Even - // soft-deleted ones may have sent mails before deletion, and we - // want to show those in historical months. const result = await pool.query( `SELECT email_address FROM mailboxes WHERE domain=$1`, [d], ); - const emails: string[] = result.rows.map((r: any) => String(r.email_address)); + const emailsFromPg: string[] = result.rows.map((r: any) => String(r.email_address)); - // Aggregate in parallel — DynamoDB is fine with this. - const perInbox = await Promise.all(emails.map((e) => aggregateInbox(d, e, ym))); + // Also discover any email addresses that have events in DynamoDB but are + // no longer in the local PostgreSQL (e.g. mailboxes that were created on + // a different node, or hard-deleted from PostgreSQL but still have events). + // We do a targeted Scan filtered by the domain prefix on pk. + const emailsFromDdb = await listEmailsForDomain(d, ym); + + const allEmails = Array.from(new Set([...emailsFromPg, ...emailsFromDdb])); + + const perInbox = await Promise.all(allEmails.map((e) => aggregateInbox(d, e, ym))); - // Drop empty entries; sort the rest by send_count desc. const nonEmpty = perInbox.filter( (v) => v.send_count > 0 || v.bounce_count > 0 || v.complaint_count > 0, ); @@ -118,19 +121,148 @@ export async function getDomainVolumeForMonth(domain: string, ym: string): Promi { send_count: 0, bytes_total: 0, bounce_count: 0, complaint_count: 0 }, ); - return { - domain: d, - ym, - ...totals, - per_inbox: nonEmpty, - }; + return { domain: d, ym, ...totals, per_inbox: nonEmpty }; } /** - * Convenience: get volume for the current month across all mailboxes - * in a domain. + * Find the set of email addresses that have any event for the given + * (domain, ym) bucket. Used to ensure the per-inbox drilldown surfaces + * historical mailboxes that no longer exist in the local PostgreSQL. + * + * Uses a Scan with a begins_with filter on pk because we don't have + * a GSI on domain. For typical event volumes this is fast enough; a + * GSI can be added later if it becomes a hot spot. */ -export function currentYm(): string { - const now = new Date(); - return `${now.getUTCFullYear()}-${String(now.getUTCMonth() + 1).padStart(2, '0')}`; +async function listEmailsForDomain(domain: string, ym: string): Promise { + const prefix = `${domain}#`; + const suffix = `#${ym}`; + const emails = new Set(); + let lastEvaluatedKey: Record | undefined; + + do { + const resp = await doc.send(new ScanCommand({ + TableName: TABLE_NAME, + FilterExpression: 'begins_with(pk, :p)', + ExpressionAttributeValues: { ':p': prefix }, + ProjectionExpression: 'pk', + ExclusiveStartKey: lastEvaluatedKey, + })); + + for (const row of (resp.Items ?? []) as { pk?: string }[]) { + if (!row.pk) continue; + // pk format: "domain#email#YYYY-MM" + if (!row.pk.endsWith(suffix)) continue; + const inner = row.pk.slice(prefix.length, row.pk.length - suffix.length); + if (inner) emails.add(inner); + } + + lastEvaluatedKey = resp.LastEvaluatedKey; + } while (lastEvaluatedKey); + + return [...emails]; +} + +// ============================================================ +// All-domains overview (single Scan, no per-domain Query loop) +// ============================================================ + +export interface DomainOverviewRow { + domain: string; + send_count: number; + bytes_total: number; + bounce_count: number; + complaint_count: number; + inbox_count: number; +} + +export interface VolumeOverview { + ym: string; + total_send_count: number; + total_bounce_count: number; + total_complaint_count: number; + total_inbox_count: number; + rows: DomainOverviewRow[]; +} + +/** + * Cross-domain overview. One Scan reads all events for the given month, + * aggregated per domain. We rely on the fact that pk encodes both the + * domain and the ym, so a server-side FilterExpression cuts the scanned + * set down to just this month. + * + * Note: DynamoDB Scan still reads the entire table for billing, but the + * filter reduces network bytes returned. For our event volumes that's + * acceptable. If the table grows past a few hundred thousand items per + * month we should add a GSI keyed on (ym). + */ +export async function getVolumeOverview(ym: string): Promise { + const suffix = `#${ym}`; + let lastEvaluatedKey: Record | undefined; + + // Per-domain accumulators with a Set for unique emails. + const acc = new Map; + }>(); + + do { + const resp = await doc.send(new ScanCommand({ + TableName: TABLE_NAME, + // Filter at the server: only pk values ending in this month. + // DynamoDB doesn't support "ends_with" on a key, so we use + // contains() — works because '#2026-04' is unique enough. + FilterExpression: 'contains(pk, :ym)', + ExpressionAttributeValues: { ':ym': suffix }, + ProjectionExpression: 'pk, event_type, size_bytes', + ExclusiveStartKey: lastEvaluatedKey, + })); + + for (const row of (resp.Items ?? []) as RawEventRow[]) { + if (!row.pk || !row.pk.endsWith(suffix)) continue; + // Strip the trailing "#ym" then split into [domain, email]. + const head = row.pk.slice(0, row.pk.length - suffix.length); + const sep = head.indexOf('#'); + if (sep < 0) continue; + const domain = head.slice(0, sep); + const email = head.slice(sep + 1); + + let entry = acc.get(domain); + if (!entry) { + entry = { send: 0, bytes: 0, bounce: 0, complaint: 0, inboxes: new Set() }; + acc.set(domain, entry); + } + entry.inboxes.add(email); + + if (row.event_type === 'send') { + entry.send++; + entry.bytes += Number(row.size_bytes ?? 0); + } else if (row.event_type === 'bounce') { + entry.bounce++; + } else if (row.event_type === 'complaint') { + entry.complaint++; + } + } + + lastEvaluatedKey = resp.LastEvaluatedKey; + } while (lastEvaluatedKey); + + const rows: DomainOverviewRow[] = [...acc.entries()].map(([domain, e]) => ({ + domain, + send_count: e.send, + bytes_total: e.bytes, + bounce_count: e.bounce, + complaint_count: e.complaint, + inbox_count: e.inboxes.size, + })).sort((a, b) => b.send_count - a.send_count || a.domain.localeCompare(b.domain)); + + const totals = rows.reduce( + (s, r) => ({ + total_send_count: s.total_send_count + r.send_count, + total_bounce_count: s.total_bounce_count + r.bounce_count, + total_complaint_count: s.total_complaint_count + r.complaint_count, + total_inbox_count: s.total_inbox_count + r.inbox_count, + }), + { total_send_count: 0, total_bounce_count: 0, total_complaint_count: 0, total_inbox_count: 0 }, + ); + + return { ym, ...totals, rows }; } diff --git a/frontend/src/components/BillingModal.jsx b/frontend/src/components/BillingModal.jsx index 0b4c499..ef3e731 100644 --- a/frontend/src/components/BillingModal.jsx +++ b/frontend/src/components/BillingModal.jsx @@ -1,8 +1,11 @@ import React, { useEffect, useMemo, useState } from 'react'; import { FiDollarSign, FiList, FiPlus, FiMinus, FiInbox, FiSend, - FiAlertCircle, FiAlertTriangle, + FiAlertCircle, FiAlertTriangle, FiArrowLeft, FiArrowRight, } from 'react-icons/fi'; +import { + BarChart, Bar, XAxis, YAxis, Tooltip, ResponsiveContainer, CartesianGrid, +} from 'recharts'; import Modal from './Modal'; import LoadingOverlay from './LoadingOverlay'; import { billingAPI } from '../services/api'; @@ -22,14 +25,28 @@ const currentYm = () => { const d = new Date(); return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`; }; +const previousYm = () => { + const now = new Date(); + const d = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() - 1, 1)); + return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`; +}; +const ymLabel = (ym) => { + const [y, m] = ym.split('-').map(Number); + return `${MONTH_NAMES[m - 1]} ${y}`; +}; const BillingModal = ({ open, domains, onClose, onToast }) => { const [activeTab, setActiveTab] = useState('summary'); - const [domainFilter, setDomainFilter] = useState(''); - const [ymFilter, setYmFilter] = useState(currentYm()); + const [domainFilter, setDomainFilter] = useState(''); // for summary/events + const [ymFilter, setYmFilter] = useState(currentYm()); // for volume const [summary, setSummary] = useState(null); const [events, setEvents] = useState([]); - const [volume, setVolume] = useState(null); + + // Volume tab state: overview vs drilldown + const [overview, setOverview] = useState(null); + const [drilldownDomain, setDrilldownDomain] = useState(null); + const [drilldownData, setDrilldownData] = useState(null); + const [loading, setLoading] = useState(false); const sortedDomains = useMemo( @@ -37,43 +54,43 @@ const BillingModal = ({ open, domains, onClose, onToast }) => { [domains] ); - // Build a list of months to choose from, going back ~12 months from now. - const ymOptions = useMemo(() => { - const out = []; - const now = new Date(); - for (let i = 0; i < 12; i++) { - const d = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() - i, 1)); - const ym = `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`; - out.push({ ym, label: `${MONTH_NAMES[d.getUTCMonth()]} ${d.getUTCFullYear()}` }); - } - return out; - }, []); + // Volume tab is restricted to current + previous month (per product decision). + const ymOptions = useMemo(() => ([ + { ym: currentYm(), label: ymLabel(currentYm()) }, + { ym: previousYm(), label: ymLabel(previousYm()) }, + ]), []); - const reload = async () => { - if (!open) return; + const reloadSummary = async () => { setLoading(true); try { - const filter = domainFilter || undefined; - - if (activeTab === 'summary') { - setSummary(await billingAPI.summary(filter)); - } else if (activeTab === 'events') { - setEvents(await billingAPI.events({ domain: filter, limit: 500 })); - } else if (activeTab === 'volume') { - if (!filter) { - // Volume requires a specific domain. - setVolume(null); - } else { - setVolume(await billingAPI.volume({ domain: filter, ym: ymFilter })); - } - } - } catch (err) { - onToast?.(`Failed to load: ${err.message}`, 'error'); - } finally { - setLoading(false); - } + setSummary(await billingAPI.summary(domainFilter || undefined)); + } catch (err) { onToast?.(`Failed to load: ${err.message}`, 'error'); } + finally { setLoading(false); } + }; + const reloadEvents = async () => { + setLoading(true); + try { + setEvents(await billingAPI.events({ domain: domainFilter || undefined, limit: 500 })); + } catch (err) { onToast?.(`Failed to load: ${err.message}`, 'error'); } + finally { setLoading(false); } + }; + const reloadOverview = async () => { + setLoading(true); + try { + setOverview(await billingAPI.volumeOverview({ ym: ymFilter })); + } catch (err) { onToast?.(`Failed to load: ${err.message}`, 'error'); } + finally { setLoading(false); } + }; + const reloadDrilldown = async () => { + if (!drilldownDomain) return; + setLoading(true); + try { + setDrilldownData(await billingAPI.volume({ domain: drilldownDomain, ym: ymFilter })); + } catch (err) { onToast?.(`Failed to load: ${err.message}`, 'error'); } + finally { setLoading(false); } }; + // Reset everything when modal opens useEffect(() => { if (open) { setActiveTab('summary'); @@ -81,15 +98,23 @@ const BillingModal = ({ open, domains, onClose, onToast }) => { setYmFilter(currentYm()); setSummary(null); setEvents([]); - setVolume(null); + setOverview(null); + setDrilldownDomain(null); + setDrilldownData(null); } - // eslint-disable-next-line react-hooks/exhaustive-deps }, [open]); + // Load data depending on which view is active useEffect(() => { - reload(); + if (!open) return; + if (activeTab === 'summary') reloadSummary(); + else if (activeTab === 'events') reloadEvents(); + else if (activeTab === 'volume') { + if (drilldownDomain) reloadDrilldown(); + else reloadOverview(); + } // eslint-disable-next-line react-hooks/exhaustive-deps - }, [open, activeTab, domainFilter, ymFilter]); + }, [open, activeTab, domainFilter, ymFilter, drilldownDomain]); return ( { return ( +
{domain} · {monthLabel}
- + {
- {/* Per-inbox breakdown */} {volume.per_inbox.length === 0 ? (

No SES events recorded for {domain} in {monthLabel}. @@ -384,10 +556,7 @@ const VolumeView = ({ volume, domain, ymFilter }) => {

{v.email} {isProblematic(v) && ( - + )}
diff --git a/frontend/src/services/api.js b/frontend/src/services/api.js index 940c77d..4832c43 100644 --- a/frontend/src/services/api.js +++ b/frontend/src/services/api.js @@ -46,29 +46,17 @@ export const mailboxesAPI = { remove: async (email) => (await api.delete(`/api/mailboxes/${encodeURIComponent(email)}`)).data, setPassword: async (email, password) => - (await api.post( - `/api/mailboxes/${encodeURIComponent(email)}/password`, - { password } - )).data, + (await api.post(`/api/mailboxes/${encodeURIComponent(email)}/password`, { password })).data, setQuota: async (email, quota_gb) => - (await api.post( - `/api/mailboxes/${encodeURIComponent(email)}/quota`, - { quota_gb } - )).data, + (await api.post(`/api/mailboxes/${encodeURIComponent(email)}/quota`, { quota_gb })).data, getRules: async (email) => (await api.get(`/api/mailboxes/${encodeURIComponent(email)}/rules`)).data, putRules: async (email, payload) => - (await api.put( - `/api/mailboxes/${encodeURIComponent(email)}/rules`, - payload - )).data, + (await api.put(`/api/mailboxes/${encodeURIComponent(email)}/rules`, payload)).data, getBlocklist: async (email) => (await api.get(`/api/mailboxes/${encodeURIComponent(email)}/blocklist`)).data, putBlocklist: async (email, blocked_patterns) => - (await api.put( - `/api/mailboxes/${encodeURIComponent(email)}/blocklist`, - { blocked_patterns } - )).data, + (await api.put(`/api/mailboxes/${encodeURIComponent(email)}/blocklist`, { blocked_patterns })).data, }; export const auditAPI = { @@ -107,6 +95,12 @@ export const billingAPI = { if (ym) params.set('ym', ym); return (await api.get(`/api/billing/volume?${params.toString()}`)).data; }, + volumeOverview: async ({ ym } = {}) => { + const params = new URLSearchParams(); + if (ym) params.set('ym', ym); + const qs = params.toString(); + return (await api.get(`/api/billing/volume-overview${qs ? '?' + qs : ''}`)).data; + }, }; export const healthAPI = {