SES volume
This commit is contained in:
@@ -1,29 +1,23 @@
|
||||
import { Router } from 'express';
|
||||
import { requireAuth, requireSuperAdmin } from '../middleware/auth.js';
|
||||
import { computeMonthlyBilling, listBillingEvents, PRICE_PER_INBOX } from '../services/billing.js';
|
||||
import { getDomainVolumeForMonth, currentYm } from '../services/ses-events.js';
|
||||
import {
|
||||
getDomainVolumeForMonth,
|
||||
getVolumeOverview,
|
||||
currentYm,
|
||||
previousYm,
|
||||
} from '../services/ses-events.js';
|
||||
|
||||
export const billingRouter = Router();
|
||||
billingRouter.use(requireAuth);
|
||||
billingRouter.use(requireSuperAdmin);
|
||||
|
||||
/**
|
||||
* GET /api/billing/summary?domain=foo.com
|
||||
* Inbox-count based monthly summary ($5 per inbox per month).
|
||||
*/
|
||||
billingRouter.get('/summary', async (req, res) => {
|
||||
const domain = req.query.domain ? String(req.query.domain).toLowerCase() : undefined;
|
||||
const months = await computeMonthlyBilling({ domain });
|
||||
res.json({
|
||||
price_per_inbox: PRICE_PER_INBOX,
|
||||
months,
|
||||
});
|
||||
res.json({ price_per_inbox: PRICE_PER_INBOX, months });
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/billing/events?domain=foo.com
|
||||
* Raw mailbox lifecycle event log (created/deleted).
|
||||
*/
|
||||
billingRouter.get('/events', async (req, res) => {
|
||||
const events = await listBillingEvents({
|
||||
domain: req.query.domain ? String(req.query.domain).toLowerCase() : undefined,
|
||||
@@ -36,13 +30,7 @@ billingRouter.get('/events', async (req, res) => {
|
||||
|
||||
/**
|
||||
* GET /api/billing/volume?domain=foo.com&ym=2026-04
|
||||
* SES outbound volume for a domain in a specific month.
|
||||
*
|
||||
* Returns per-inbox + domain totals with send count, total bytes,
|
||||
* bounce count and complaint count.
|
||||
*
|
||||
* Defaults to the current month if ym is omitted.
|
||||
* Domain is required because volume is always per-domain.
|
||||
* Per-inbox drilldown for a single domain.
|
||||
*/
|
||||
billingRouter.get('/volume', async (req, res) => {
|
||||
const domain = req.query.domain ? String(req.query.domain).toLowerCase() : '';
|
||||
@@ -51,16 +39,38 @@ billingRouter.get('/volume', async (req, res) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const ym = req.query.ym
|
||||
? String(req.query.ym)
|
||||
: currentYm();
|
||||
|
||||
// Validate ym format
|
||||
const ym = req.query.ym ? String(req.query.ym) : currentYm();
|
||||
if (!/^\d{4}-\d{2}$/.test(ym)) {
|
||||
res.status(400).json({ error: 'ym must be in YYYY-MM format' });
|
||||
return;
|
||||
}
|
||||
|
||||
const volume = await getDomainVolumeForMonth(domain, ym);
|
||||
res.json(volume);
|
||||
res.json(await getDomainVolumeForMonth(domain, ym));
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/billing/volume-overview?ym=2026-04
|
||||
* Cross-domain overview for super admin. One DynamoDB scan, aggregates
|
||||
* per domain. ym defaults to the current month.
|
||||
*
|
||||
* Returns:
|
||||
* {
|
||||
* ym: 'YYYY-MM',
|
||||
* total_send_count, total_bounce_count, total_complaint_count, total_inbox_count,
|
||||
* rows: [{ domain, send_count, bytes_total, bounce_count, complaint_count, inbox_count }, ...]
|
||||
* }
|
||||
*/
|
||||
billingRouter.get('/volume-overview', async (req, res) => {
|
||||
// Limit to current/previous month per product decision — no point in
|
||||
// accepting arbitrary ym values from the client right now.
|
||||
const ym = req.query.ym ? String(req.query.ym) : currentYm();
|
||||
const allowed = new Set([currentYm(), previousYm()]);
|
||||
if (!allowed.has(ym)) {
|
||||
res.status(400).json({
|
||||
error: `ym must be one of: ${[...allowed].join(', ')}`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
res.json(await getVolumeOverview(ym));
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
|
||||
import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb';
|
||||
import { DynamoDBDocumentClient, QueryCommand, ScanCommand } from '@aws-sdk/lib-dynamodb';
|
||||
import { config } from '../config.js';
|
||||
import { pool } from '../db.js';
|
||||
|
||||
@@ -13,7 +13,7 @@ const doc = DynamoDBDocumentClient.from(
|
||||
export interface InboxVolume {
|
||||
email: string;
|
||||
domain: string;
|
||||
ym: string; // 'YYYY-MM'
|
||||
ym: string;
|
||||
send_count: number;
|
||||
bytes_total: number;
|
||||
bounce_count: number;
|
||||
@@ -31,15 +31,27 @@ export interface DomainVolume {
|
||||
}
|
||||
|
||||
interface RawEventRow {
|
||||
pk?: string;
|
||||
event_type: 'send' | 'bounce' | 'complaint';
|
||||
size_bytes?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query all events for a single (email, ym) bucket and aggregate them
|
||||
* into counts. This is the lowest-level helper used by the higher-level
|
||||
* domain aggregator below.
|
||||
*/
|
||||
export function currentYm(): string {
|
||||
const now = new Date();
|
||||
return `${now.getUTCFullYear()}-${String(now.getUTCMonth() + 1).padStart(2, '0')}`;
|
||||
}
|
||||
|
||||
export function previousYm(): string {
|
||||
const now = new Date();
|
||||
// Construct first day of previous month in UTC, then format.
|
||||
const d = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() - 1, 1));
|
||||
return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Single-domain volume (used for the per-inbox drilldown)
|
||||
// ============================================================
|
||||
|
||||
async function aggregateInbox(domain: string, email: string, ym: string): Promise<InboxVolume> {
|
||||
const pk = `${domain}#${email}#${ym}`;
|
||||
|
||||
@@ -49,16 +61,12 @@ async function aggregateInbox(domain: string, email: string, ym: string): Promis
|
||||
let bounce_count = 0;
|
||||
let complaint_count = 0;
|
||||
|
||||
// DynamoDB Query is paginated — loop until done. For a single
|
||||
// mailbox-month the result set is normally small (< few thousand)
|
||||
// so this loops at most a couple of times.
|
||||
do {
|
||||
const resp = await doc.send(new QueryCommand({
|
||||
TableName: TABLE_NAME,
|
||||
KeyConditionExpression: 'pk = :pk',
|
||||
ExpressionAttributeValues: { ':pk': pk },
|
||||
ExclusiveStartKey: lastEvaluatedKey,
|
||||
// We only need a few fields for aggregation, not the full row.
|
||||
ProjectionExpression: 'event_type, size_bytes',
|
||||
}));
|
||||
|
||||
@@ -79,30 +87,25 @@ async function aggregateInbox(domain: string, email: string, ym: string): Promis
|
||||
return { email, domain, ym, send_count, bytes_total, bounce_count, complaint_count };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the full domain volume for a given month.
|
||||
*
|
||||
* Looks up all mailboxes (active + deleted) that ever existed in the
|
||||
* domain — we use the mailboxes table for that, which is cheap. Then
|
||||
* for each mailbox we aggregate the events. Mailboxes with zero events
|
||||
* are omitted from the per_inbox list (but they're free anyway).
|
||||
*/
|
||||
export async function getDomainVolumeForMonth(domain: string, ym: string): Promise<DomainVolume> {
|
||||
const d = domain.toLowerCase();
|
||||
|
||||
// Pull every mailbox that ever existed for this domain. Even
|
||||
// soft-deleted ones may have sent mails before deletion, and we
|
||||
// want to show those in historical months.
|
||||
const result = await pool.query(
|
||||
`SELECT email_address FROM mailboxes WHERE domain=$1`,
|
||||
[d],
|
||||
);
|
||||
const emails: string[] = result.rows.map((r: any) => String(r.email_address));
|
||||
const emailsFromPg: string[] = result.rows.map((r: any) => String(r.email_address));
|
||||
|
||||
// Aggregate in parallel — DynamoDB is fine with this.
|
||||
const perInbox = await Promise.all(emails.map((e) => aggregateInbox(d, e, ym)));
|
||||
// Also discover any email addresses that have events in DynamoDB but are
|
||||
// no longer in the local PostgreSQL (e.g. mailboxes that were created on
|
||||
// a different node, or hard-deleted from PostgreSQL but still have events).
|
||||
// We do a targeted Scan filtered by the domain prefix on pk.
|
||||
const emailsFromDdb = await listEmailsForDomain(d, ym);
|
||||
|
||||
const allEmails = Array.from(new Set([...emailsFromPg, ...emailsFromDdb]));
|
||||
|
||||
const perInbox = await Promise.all(allEmails.map((e) => aggregateInbox(d, e, ym)));
|
||||
|
||||
// Drop empty entries; sort the rest by send_count desc.
|
||||
const nonEmpty = perInbox.filter(
|
||||
(v) => v.send_count > 0 || v.bounce_count > 0 || v.complaint_count > 0,
|
||||
);
|
||||
@@ -118,19 +121,148 @@ export async function getDomainVolumeForMonth(domain: string, ym: string): Promi
|
||||
{ send_count: 0, bytes_total: 0, bounce_count: 0, complaint_count: 0 },
|
||||
);
|
||||
|
||||
return {
|
||||
domain: d,
|
||||
ym,
|
||||
...totals,
|
||||
per_inbox: nonEmpty,
|
||||
};
|
||||
return { domain: d, ym, ...totals, per_inbox: nonEmpty };
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience: get volume for the current month across all mailboxes
|
||||
* in a domain.
|
||||
* Find the set of email addresses that have any event for the given
|
||||
* (domain, ym) bucket. Used to ensure the per-inbox drilldown surfaces
|
||||
* historical mailboxes that no longer exist in the local PostgreSQL.
|
||||
*
|
||||
* Uses a Scan with a begins_with filter on pk because we don't have
|
||||
* a GSI on domain. For typical event volumes this is fast enough; a
|
||||
* GSI can be added later if it becomes a hot spot.
|
||||
*/
|
||||
export function currentYm(): string {
|
||||
const now = new Date();
|
||||
return `${now.getUTCFullYear()}-${String(now.getUTCMonth() + 1).padStart(2, '0')}`;
|
||||
async function listEmailsForDomain(domain: string, ym: string): Promise<string[]> {
|
||||
const prefix = `${domain}#`;
|
||||
const suffix = `#${ym}`;
|
||||
const emails = new Set<string>();
|
||||
let lastEvaluatedKey: Record<string, unknown> | undefined;
|
||||
|
||||
do {
|
||||
const resp = await doc.send(new ScanCommand({
|
||||
TableName: TABLE_NAME,
|
||||
FilterExpression: 'begins_with(pk, :p)',
|
||||
ExpressionAttributeValues: { ':p': prefix },
|
||||
ProjectionExpression: 'pk',
|
||||
ExclusiveStartKey: lastEvaluatedKey,
|
||||
}));
|
||||
|
||||
for (const row of (resp.Items ?? []) as { pk?: string }[]) {
|
||||
if (!row.pk) continue;
|
||||
// pk format: "domain#email#YYYY-MM"
|
||||
if (!row.pk.endsWith(suffix)) continue;
|
||||
const inner = row.pk.slice(prefix.length, row.pk.length - suffix.length);
|
||||
if (inner) emails.add(inner);
|
||||
}
|
||||
|
||||
lastEvaluatedKey = resp.LastEvaluatedKey;
|
||||
} while (lastEvaluatedKey);
|
||||
|
||||
return [...emails];
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// All-domains overview (single Scan, no per-domain Query loop)
|
||||
// ============================================================
|
||||
|
||||
export interface DomainOverviewRow {
|
||||
domain: string;
|
||||
send_count: number;
|
||||
bytes_total: number;
|
||||
bounce_count: number;
|
||||
complaint_count: number;
|
||||
inbox_count: number;
|
||||
}
|
||||
|
||||
export interface VolumeOverview {
|
||||
ym: string;
|
||||
total_send_count: number;
|
||||
total_bounce_count: number;
|
||||
total_complaint_count: number;
|
||||
total_inbox_count: number;
|
||||
rows: DomainOverviewRow[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Cross-domain overview. One Scan reads all events for the given month,
|
||||
* aggregated per domain. We rely on the fact that pk encodes both the
|
||||
* domain and the ym, so a server-side FilterExpression cuts the scanned
|
||||
* set down to just this month.
|
||||
*
|
||||
* Note: DynamoDB Scan still reads the entire table for billing, but the
|
||||
* filter reduces network bytes returned. For our event volumes that's
|
||||
* acceptable. If the table grows past a few hundred thousand items per
|
||||
* month we should add a GSI keyed on (ym).
|
||||
*/
|
||||
export async function getVolumeOverview(ym: string): Promise<VolumeOverview> {
|
||||
const suffix = `#${ym}`;
|
||||
let lastEvaluatedKey: Record<string, unknown> | undefined;
|
||||
|
||||
// Per-domain accumulators with a Set for unique emails.
|
||||
const acc = new Map<string, {
|
||||
send: number; bytes: number; bounce: number; complaint: number; inboxes: Set<string>;
|
||||
}>();
|
||||
|
||||
do {
|
||||
const resp = await doc.send(new ScanCommand({
|
||||
TableName: TABLE_NAME,
|
||||
// Filter at the server: only pk values ending in this month.
|
||||
// DynamoDB doesn't support "ends_with" on a key, so we use
|
||||
// contains() — works because '#2026-04' is unique enough.
|
||||
FilterExpression: 'contains(pk, :ym)',
|
||||
ExpressionAttributeValues: { ':ym': suffix },
|
||||
ProjectionExpression: 'pk, event_type, size_bytes',
|
||||
ExclusiveStartKey: lastEvaluatedKey,
|
||||
}));
|
||||
|
||||
for (const row of (resp.Items ?? []) as RawEventRow[]) {
|
||||
if (!row.pk || !row.pk.endsWith(suffix)) continue;
|
||||
// Strip the trailing "#ym" then split into [domain, email].
|
||||
const head = row.pk.slice(0, row.pk.length - suffix.length);
|
||||
const sep = head.indexOf('#');
|
||||
if (sep < 0) continue;
|
||||
const domain = head.slice(0, sep);
|
||||
const email = head.slice(sep + 1);
|
||||
|
||||
let entry = acc.get(domain);
|
||||
if (!entry) {
|
||||
entry = { send: 0, bytes: 0, bounce: 0, complaint: 0, inboxes: new Set() };
|
||||
acc.set(domain, entry);
|
||||
}
|
||||
entry.inboxes.add(email);
|
||||
|
||||
if (row.event_type === 'send') {
|
||||
entry.send++;
|
||||
entry.bytes += Number(row.size_bytes ?? 0);
|
||||
} else if (row.event_type === 'bounce') {
|
||||
entry.bounce++;
|
||||
} else if (row.event_type === 'complaint') {
|
||||
entry.complaint++;
|
||||
}
|
||||
}
|
||||
|
||||
lastEvaluatedKey = resp.LastEvaluatedKey;
|
||||
} while (lastEvaluatedKey);
|
||||
|
||||
const rows: DomainOverviewRow[] = [...acc.entries()].map(([domain, e]) => ({
|
||||
domain,
|
||||
send_count: e.send,
|
||||
bytes_total: e.bytes,
|
||||
bounce_count: e.bounce,
|
||||
complaint_count: e.complaint,
|
||||
inbox_count: e.inboxes.size,
|
||||
})).sort((a, b) => b.send_count - a.send_count || a.domain.localeCompare(b.domain));
|
||||
|
||||
const totals = rows.reduce(
|
||||
(s, r) => ({
|
||||
total_send_count: s.total_send_count + r.send_count,
|
||||
total_bounce_count: s.total_bounce_count + r.bounce_count,
|
||||
total_complaint_count: s.total_complaint_count + r.complaint_count,
|
||||
total_inbox_count: s.total_inbox_count + r.inbox_count,
|
||||
}),
|
||||
{ total_send_count: 0, total_bounce_count: 0, total_complaint_count: 0, total_inbox_count: 0 },
|
||||
);
|
||||
|
||||
return { ym, ...totals, rows };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user