move and imports changed

This commit is contained in:
2026-03-07 14:59:41 -06:00
parent 908bb76c3a
commit c826d4c299
19 changed files with 3209 additions and 16 deletions

View File

@@ -0,0 +1,230 @@
/**
* DynamoDB operations handler
*
* Tables:
* - email-rules → OOO / Forward rules per address
* - ses-outbound-messages → Bounce info (MessageId → original sender)
* - email-blocked-senders → Blocked patterns per address
*/
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
DynamoDBDocumentClient,
GetCommand,
BatchGetCommand,
} from '@aws-sdk/lib-dynamodb';
import { config } from '../config.js';
import { log } from '../logger.js';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface EmailRule {
email_address: string;
ooo_active?: boolean;
ooo_message?: string;
ooo_content_type?: string;
forwards?: string[];
[key: string]: unknown;
}
export interface BounceInfo {
original_source: string;
bounceType: string;
bounceSubType: string;
bouncedRecipients: string[];
timestamp: string;
}
// ---------------------------------------------------------------------------
// Handler
// ---------------------------------------------------------------------------
export class DynamoDBHandler {
private docClient: DynamoDBDocumentClient;
public available = false;
constructor() {
const raw = new DynamoDBClient({ region: config.awsRegion });
this.docClient = DynamoDBDocumentClient.from(raw, {
marshallOptions: { removeUndefinedValues: true },
});
this.initialize();
}
// -----------------------------------------------------------------------
// Init
// -----------------------------------------------------------------------
private initialize(): void {
// We just mark as available; actual connectivity is tested on first call.
// The Python version tested table_status, but that's a DescribeTable call
// which is heavy and not needed the first GetItem will tell us.
this.available = true;
log('✓ DynamoDB client initialized');
}
/**
* Verify tables exist by doing a cheap GetItem on each.
* Called once during startup.
*/
async verifyTables(): Promise<boolean> {
try {
await Promise.all([
this.docClient.send(
new GetCommand({ TableName: config.rulesTable, Key: { email_address: '__probe__' } }),
),
this.docClient.send(
new GetCommand({ TableName: config.messagesTable, Key: { MessageId: '__probe__' } }),
),
this.docClient.send(
new GetCommand({ TableName: config.blockedTable, Key: { email_address: '__probe__' } }),
),
]);
this.available = true;
log('✓ DynamoDB tables connected successfully');
return true;
} catch (err: any) {
log(`⚠ DynamoDB not fully available: ${err.message ?? err}`, 'WARNING');
this.available = false;
return false;
}
}
// -----------------------------------------------------------------------
// Email rules
// -----------------------------------------------------------------------
async getEmailRules(emailAddress: string): Promise<EmailRule | null> {
if (!this.available) return null;
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.rulesTable,
Key: { email_address: emailAddress },
}),
);
return (resp.Item as EmailRule) ?? null;
} catch (err: any) {
if (err.name !== 'ResourceNotFoundException') {
log(`⚠ DynamoDB error for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
}
return null;
}
}
// -----------------------------------------------------------------------
// Bounce info
// -----------------------------------------------------------------------
async getBounceInfo(
messageId: string,
workerName = 'unified',
): Promise<BounceInfo | null> {
if (!this.available) return null;
for (let attempt = 0; attempt < config.bounceLookupRetries; attempt++) {
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.messagesTable,
Key: { MessageId: messageId },
}),
);
if (resp.Item) {
return {
original_source: (resp.Item.original_source as string) ?? '',
bounceType: (resp.Item.bounceType as string) ?? 'Unknown',
bounceSubType: (resp.Item.bounceSubType as string) ?? 'Unknown',
bouncedRecipients: (resp.Item.bouncedRecipients as string[]) ?? [],
timestamp: (resp.Item.timestamp as string) ?? '',
};
}
if (attempt < config.bounceLookupRetries - 1) {
log(
` Bounce record not found yet, retrying in ${config.bounceLookupDelay}s ` +
`(attempt ${attempt + 1}/${config.bounceLookupRetries})...`,
'INFO',
workerName,
);
await sleep(config.bounceLookupDelay * 1000);
} else {
log(
`⚠ No bounce record found after ${config.bounceLookupRetries} attempts ` +
`for Message-ID: ${messageId}`,
'WARNING',
workerName,
);
return null;
}
} catch (err: any) {
log(
`⚠ DynamoDB Error (attempt ${attempt + 1}/${config.bounceLookupRetries}): ` +
`${err.message ?? err}`,
'ERROR',
workerName,
);
if (attempt < config.bounceLookupRetries - 1) {
await sleep(config.bounceLookupDelay * 1000);
} else {
return null;
}
}
}
return null;
}
// -----------------------------------------------------------------------
// Blocked senders
// -----------------------------------------------------------------------
async getBlockedPatterns(emailAddress: string): Promise<string[]> {
if (!this.available) return [];
try {
const resp = await this.docClient.send(
new GetCommand({
TableName: config.blockedTable,
Key: { email_address: emailAddress },
}),
);
return (resp.Item?.blocked_patterns as string[]) ?? [];
} catch (err: any) {
log(`⚠ Error getting block list for ${emailAddress}: ${err.message ?? err}`, 'ERROR');
return [];
}
}
async batchGetBlockedPatterns(
emailAddresses: string[],
): Promise<Record<string, string[]>> {
const empty: Record<string, string[]> = {};
for (const a of emailAddresses) empty[a] = [];
if (!this.available || emailAddresses.length === 0) return empty;
try {
const keys = emailAddresses.map((a) => ({ email_address: a }));
const resp = await this.docClient.send(
new BatchGetCommand({
RequestItems: {
[config.blockedTable]: { Keys: keys },
},
}),
);
const items = resp.Responses?.[config.blockedTable] ?? [];
const result: Record<string, string[]> = { ...empty };
for (const item of items) {
const addr = item.email_address as string;
result[addr] = (item.blocked_patterns as string[]) ?? [];
}
return result;
} catch (err: any) {
log(`⚠ Batch blocklist check error: ${err.message ?? err}`, 'ERROR');
return empty;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -0,0 +1,202 @@
/**
* S3 operations handler
*
* Responsibilities:
* - Download raw email from domain-specific bucket
* - Mark email metadata (processed / all-invalid / blocked)
* - Delete blocked emails
*/
import {
S3Client,
GetObjectCommand,
HeadObjectCommand,
CopyObjectCommand,
DeleteObjectCommand,
type S3ClientConfig,
} from '@aws-sdk/client-s3';
import { config, domainToBucketName } from '../config.js';
import { log } from '../logger.js';
export class S3Handler {
private client: S3Client;
constructor() {
const opts: S3ClientConfig = { region: config.awsRegion };
this.client = new S3Client(opts);
}
// -------------------------------------------------------------------------
// Download
// -------------------------------------------------------------------------
/**
* Download raw email bytes from S3.
* Returns `null` when the object does not exist yet (caller should retry).
* Throws on permanent errors.
*/
async getEmail(
domain: string,
messageId: string,
receiveCount: number,
): Promise<Buffer | null> {
const bucket = domainToBucketName(domain);
try {
const resp = await this.client.send(
new GetObjectCommand({ Bucket: bucket, Key: messageId }),
);
const bytes = await resp.Body?.transformToByteArray();
return bytes ? Buffer.from(bytes) : null;
} catch (err: any) {
if (err.name === 'NoSuchKey' || err.Code === 'NoSuchKey') {
if (receiveCount < 5) {
log(`⏳ S3 Object not found yet (Attempt ${receiveCount}). Retrying...`, 'WARNING');
return null;
}
log('❌ S3 Object missing permanently after retries.', 'ERROR');
throw err;
}
log(`❌ S3 Download Error: ${err.message ?? err}`, 'ERROR');
throw err;
}
}
// -------------------------------------------------------------------------
// Metadata helpers (copy-in-place with updated metadata)
// -------------------------------------------------------------------------
private async updateMetadata(
bucket: string,
key: string,
patch: Record<string, string>,
removeKeys: string[] = [],
): Promise<void> {
const head = await this.client.send(
new HeadObjectCommand({ Bucket: bucket, Key: key }),
);
const metadata = { ...(head.Metadata ?? {}) };
// Apply patch
for (const [k, v] of Object.entries(patch)) {
metadata[k] = v;
}
// Remove keys
for (const k of removeKeys) {
delete metadata[k];
}
await this.client.send(
new CopyObjectCommand({
Bucket: bucket,
Key: key,
CopySource: `${bucket}/${key}`,
Metadata: metadata,
MetadataDirective: 'REPLACE',
}),
);
}
// -------------------------------------------------------------------------
// Mark helpers
// -------------------------------------------------------------------------
async markAsProcessed(
domain: string,
messageId: string,
workerName: string,
invalidInboxes?: string[],
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
const patch: Record<string, string> = {
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'delivered',
};
if (invalidInboxes?.length) {
patch['invalid_inboxes'] = invalidInboxes.join(',');
log(`⚠ Invalid inboxes recorded: ${invalidInboxes.join(', ')}`, 'WARNING', workerName);
}
await this.updateMetadata(bucket, messageId, patch, [
'processing_started',
'queued_at',
]);
} catch (err: any) {
log(`Failed to mark as processed: ${err.message ?? err}`, 'WARNING', workerName);
}
}
async markAsAllInvalid(
domain: string,
messageId: string,
invalidInboxes: string[],
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.updateMetadata(
bucket,
messageId,
{
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'failed',
error: 'All recipients are invalid (mailboxes do not exist)',
invalid_inboxes: invalidInboxes.join(','),
},
['processing_started', 'queued_at'],
);
} catch (err: any) {
log(`Failed to mark as all invalid: ${err.message ?? err}`, 'WARNING', workerName);
}
}
async markAsBlocked(
domain: string,
messageId: string,
blockedRecipients: string[],
sender: string,
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.updateMetadata(
bucket,
messageId,
{
processed: 'true',
processed_at: String(Math.floor(Date.now() / 1000)),
processed_by: workerName,
status: 'blocked',
blocked_recipients: blockedRecipients.join(','),
blocked_sender: sender,
},
['processing_started', 'queued_at'],
);
log('✓ Marked as blocked in S3 metadata', 'INFO', workerName);
} catch (err: any) {
log(`⚠ Failed to mark as blocked: ${err.message ?? err}`, 'ERROR', workerName);
throw err;
}
}
async deleteBlockedEmail(
domain: string,
messageId: string,
workerName: string,
): Promise<void> {
const bucket = domainToBucketName(domain);
try {
await this.client.send(
new DeleteObjectCommand({ Bucket: bucket, Key: messageId }),
);
log('🗑 Deleted blocked email from S3', 'SUCCESS', workerName);
} catch (err: any) {
log(`⚠ Failed to delete blocked email: ${err.message ?? err}`, 'ERROR', workerName);
throw err;
}
}
}

View File

@@ -0,0 +1,52 @@
/**
* SES operations handler
*
* Only used for:
* - Sending OOO replies to external addresses
* - Forwarding to external addresses
*/
import {
SESClient,
SendRawEmailCommand,
} from '@aws-sdk/client-ses';
import { config } from '../config.js';
import { log } from '../logger.js';
export class SESHandler {
private client: SESClient;
constructor() {
this.client = new SESClient({ region: config.awsRegion });
}
/**
* Send a raw MIME message via SES.
* Returns true on success, false on failure (never throws).
*/
async sendRawEmail(
source: string,
destination: string,
rawMessage: Buffer,
workerName: string,
): Promise<boolean> {
try {
await this.client.send(
new SendRawEmailCommand({
Source: source,
Destinations: [destination],
RawMessage: { Data: rawMessage },
}),
);
return true;
} catch (err: any) {
const code = err.name ?? err.Code ?? 'Unknown';
log(
`⚠ SES send failed to ${destination} (${code}): ${err.message ?? err}`,
'ERROR',
workerName,
);
return false;
}
}
}

View File

@@ -0,0 +1,99 @@
/**
* SQS operations handler
*
* Responsibilities:
* - Resolve queue URL for a domain
* - Long-poll for messages
* - Delete processed messages
* - Report approximate queue size
*/
import {
SQSClient,
GetQueueUrlCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
GetQueueAttributesCommand,
type Message,
} from '@aws-sdk/client-sqs';
import { config, domainToQueueName } from '../config.js';
import { log } from '../logger.js';
export class SQSHandler {
private client: SQSClient;
constructor() {
this.client = new SQSClient({ region: config.awsRegion });
}
/** Resolve queue URL for a domain. Returns null if queue does not exist. */
async getQueueUrl(domain: string): Promise<string | null> {
const queueName = domainToQueueName(domain);
try {
const resp = await this.client.send(
new GetQueueUrlCommand({ QueueName: queueName }),
);
return resp.QueueUrl ?? null;
} catch (err: any) {
if (err.name === 'QueueDoesNotExist' ||
err.Code === 'AWS.SimpleQueueService.NonExistentQueue') {
log(`Queue not found for domain: ${domain}`, 'WARNING');
} else {
log(`Error getting queue URL for ${domain}: ${err.message ?? err}`, 'ERROR');
}
return null;
}
}
/** Long-poll for messages (uses configured poll interval as wait time). */
async receiveMessages(queueUrl: string): Promise<Message[]> {
try {
const resp = await this.client.send(
new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: config.maxMessages,
WaitTimeSeconds: config.pollInterval,
VisibilityTimeout: config.visibilityTimeout,
MessageSystemAttributeNames: ['ApproximateReceiveCount', 'SentTimestamp'],
}),
);
return resp.Messages ?? [];
} catch (err: any) {
log(`Error receiving messages: ${err.message ?? err}`, 'ERROR');
return [];
}
}
/** Delete a message from the queue after successful processing. */
async deleteMessage(queueUrl: string, receiptHandle: string): Promise<void> {
try {
await this.client.send(
new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
}),
);
} catch (err: any) {
log(`Error deleting message: ${err.message ?? err}`, 'ERROR');
throw err;
}
}
/** Approximate number of messages in the queue. Returns 0 on error. */
async getQueueSize(queueUrl: string): Promise<number> {
try {
const resp = await this.client.send(
new GetQueueAttributesCommand({
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessages'],
}),
);
return parseInt(
resp.Attributes?.ApproximateNumberOfMessages ?? '0',
10,
);
} catch {
return 0;
}
}
}

View File

@@ -0,0 +1,115 @@
/**
* Configuration management for unified email worker
*
* All settings are read from environment variables with sensible defaults.
* Domain helpers (bucket name, queue name, internal check) are co-located here
* so every module can import { config, domainToBucket, ... } from './config'.
*/
import { readFileSync, existsSync } from 'node:fs';
// ---------------------------------------------------------------------------
// Config object
// ---------------------------------------------------------------------------
export const config = {
// AWS
awsRegion: process.env.AWS_REGION ?? 'us-east-2',
// Domains
domainsList: process.env.DOMAINS ?? '',
domainsFile: process.env.DOMAINS_FILE ?? '/etc/email-worker/domains.txt',
// Worker
workerThreads: parseInt(process.env.WORKER_THREADS ?? '10', 10),
pollInterval: parseInt(process.env.POLL_INTERVAL ?? '20', 10),
maxMessages: parseInt(process.env.MAX_MESSAGES ?? '10', 10),
visibilityTimeout: parseInt(process.env.VISIBILITY_TIMEOUT ?? '300', 10),
// SMTP delivery (local DMS)
smtpHost: process.env.SMTP_HOST ?? 'localhost',
smtpPort: parseInt(process.env.SMTP_PORT ?? '25', 10),
smtpUseTls: (process.env.SMTP_USE_TLS ?? 'false').toLowerCase() === 'true',
smtpUser: process.env.SMTP_USER ?? '',
smtpPass: process.env.SMTP_PASS ?? '',
smtpPoolSize: parseInt(process.env.SMTP_POOL_SIZE ?? '5', 10),
// Internal SMTP port (for OOO / forwards to managed domains)
internalSmtpPort: parseInt(process.env.INTERNAL_SMTP_PORT ?? '25', 10),
// DynamoDB tables
rulesTable: process.env.DYNAMODB_RULES_TABLE ?? 'email-rules',
messagesTable: process.env.DYNAMODB_MESSAGES_TABLE ?? 'ses-outbound-messages',
blockedTable: process.env.DYNAMODB_BLOCKED_TABLE ?? 'email-blocked-senders',
// Bounce handling
bounceLookupRetries: parseInt(process.env.BOUNCE_LOOKUP_RETRIES ?? '3', 10),
bounceLookupDelay: parseFloat(process.env.BOUNCE_LOOKUP_DELAY ?? '1.0'),
// Monitoring
metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10),
healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10),
} as const;
export type Config = typeof config;
// ---------------------------------------------------------------------------
// Managed domains (populated by loadDomains())
// ---------------------------------------------------------------------------
const managedDomains = new Set<string>();
/**
* Load domains from env var and/or file, populate the internal set.
*/
export function loadDomains(): string[] {
const domains: string[] = [];
// From env
if (config.domainsList) {
for (const d of config.domainsList.split(',')) {
const trimmed = d.trim();
if (trimmed) domains.push(trimmed);
}
}
// From file
if (existsSync(config.domainsFile)) {
const content = readFileSync(config.domainsFile, 'utf-8');
for (const line of content.split('\n')) {
const trimmed = line.trim();
if (trimmed && !trimmed.startsWith('#')) {
domains.push(trimmed);
}
}
}
// Deduplicate
const unique = [...new Set(domains)];
managedDomains.clear();
for (const d of unique) {
managedDomains.add(d.toLowerCase());
}
return unique;
}
// ---------------------------------------------------------------------------
// Domain helpers
// ---------------------------------------------------------------------------
/** Check whether an email address belongs to one of our managed domains */
export function isInternalAddress(email: string): boolean {
const atIdx = email.indexOf('@');
if (atIdx < 0) return false;
return managedDomains.has(email.slice(atIdx + 1).toLowerCase());
}
/** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */
export function domainToQueueName(domain: string): string {
return domain.replace(/\./g, '-') + '-queue';
}
/** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */
export function domainToBucketName(domain: string): string {
return domain.replace(/\./g, '-') + '-emails';
}

View File

@@ -0,0 +1,62 @@
/**
* Sender blocklist checking with wildcard / glob support
*
* Uses picomatch for pattern matching (equivalent to Python's fnmatch).
* Patterns are stored per-recipient in DynamoDB.
*/
import picomatch from 'picomatch';
import type { DynamoDBHandler } from '../aws/dynamodb.js';
import { log } from '../logger.js';
/**
* Extract the bare email address from a From header value.
* "John Doe <john@example.com>" → "john@example.com"
*/
function extractAddress(sender: string): string {
const match = sender.match(/<([^>]+)>/);
const addr = match ? match[1] : sender;
return addr.trim().toLowerCase();
}
export class BlocklistChecker {
constructor(private dynamodb: DynamoDBHandler) {}
/**
* Batch-check whether a sender is blocked for each recipient.
* Uses a single batch DynamoDB call for efficiency.
*/
async batchCheckBlockedSenders(
recipients: string[],
sender: string,
workerName: string,
): Promise<Record<string, boolean>> {
const patternsByRecipient =
await this.dynamodb.batchGetBlockedPatterns(recipients);
const senderClean = extractAddress(sender);
const result: Record<string, boolean> = {};
for (const recipient of recipients) {
const patterns = patternsByRecipient[recipient] ?? [];
let isBlocked = false;
for (const pattern of patterns) {
if (picomatch.isMatch(senderClean, pattern.toLowerCase())) {
log(
`⛔ BLOCKED: Sender ${senderClean} matches pattern '${pattern}' ` +
`for inbox ${recipient}`,
'WARNING',
workerName,
);
isBlocked = true;
break;
}
}
result[recipient] = isBlocked;
}
return result;
}
}

View File

@@ -0,0 +1,190 @@
/**
* Bounce detection and header rewriting
*
* When Amazon SES returns a bounce, the From header is
* mailer-daemon@amazonses.com. We look up the original sender
* in DynamoDB and rewrite the headers so the bounce appears
* to come from the actual bounced recipient.
*/
import type { ParsedMail } from 'mailparser';
import type { DynamoDBHandler } from '../aws/dynamodb.js';
import { isSesBounceNotification, getHeader } from './parser.js';
import { log } from '../logger.js';
export interface BounceResult {
/** Updated raw bytes (headers rewritten if bounce was detected) */
rawBytes: Buffer;
/** Whether bounce was detected and headers were modified */
modified: boolean;
/** Whether this email is a bounce notification at all */
isBounce: boolean;
/** The effective From address (rewritten or original) */
fromAddr: string;
}
export class BounceHandler {
constructor(private dynamodb: DynamoDBHandler) {}
/**
* Detect SES bounce, look up original sender in DynamoDB,
* and rewrite headers in the raw buffer.
*
* We operate on the raw Buffer because we need to preserve
* the original MIME structure exactly, only swapping specific
* header lines. mailparser's ParsedMail is read-only.
*/
async applyBounceLogic(
parsed: ParsedMail,
rawBytes: Buffer,
subject: string,
workerName = 'unified',
): Promise<BounceResult> {
if (!isSesBounceNotification(parsed)) {
return {
rawBytes,
modified: false,
isBounce: false,
fromAddr: parsed.from?.text ?? '',
};
}
log('🔍 Detected SES MAILER-DAEMON bounce notification', 'INFO', workerName);
// Extract Message-ID from the bounce notification header
const rawMessageId = getHeader(parsed, 'message-id')
.replace(/^</, '')
.replace(/>$/, '')
.split('@')[0];
if (!rawMessageId) {
log('⚠ Could not extract Message-ID from bounce notification', 'WARNING', workerName);
return {
rawBytes,
modified: false,
isBounce: true,
fromAddr: parsed.from?.text ?? '',
};
}
log(` Looking up Message-ID: ${rawMessageId}`, 'INFO', workerName);
const bounceInfo = await this.dynamodb.getBounceInfo(rawMessageId, workerName);
if (!bounceInfo) {
return {
rawBytes,
modified: false,
isBounce: true,
fromAddr: parsed.from?.text ?? '',
};
}
// Log bounce details
log(`✓ Found bounce info:`, 'INFO', workerName);
log(` Original sender: ${bounceInfo.original_source}`, 'INFO', workerName);
log(` Bounce type: ${bounceInfo.bounceType}/${bounceInfo.bounceSubType}`, 'INFO', workerName);
log(` Bounced recipients: ${bounceInfo.bouncedRecipients}`, 'INFO', workerName);
if (!bounceInfo.bouncedRecipients.length) {
log('⚠ No bounced recipients found in bounce info', 'WARNING', workerName);
return {
rawBytes,
modified: false,
isBounce: true,
fromAddr: parsed.from?.text ?? '',
};
}
const newFrom = bounceInfo.bouncedRecipients[0];
// Rewrite headers in raw bytes
let modifiedBytes = rawBytes;
const originalFrom = getHeader(parsed, 'from');
// Replace From header
modifiedBytes = replaceHeader(modifiedBytes, 'From', newFrom);
// Add diagnostic headers
modifiedBytes = addHeader(modifiedBytes, 'X-Original-SES-From', originalFrom);
modifiedBytes = addHeader(
modifiedBytes,
'X-Bounce-Type',
`${bounceInfo.bounceType}/${bounceInfo.bounceSubType}`,
);
// Add Reply-To if not present
if (!getHeader(parsed, 'reply-to')) {
modifiedBytes = addHeader(modifiedBytes, 'Reply-To', newFrom);
}
// Adjust subject for generic delivery status notifications
const subjectLower = subject.toLowerCase();
if (
subjectLower.includes('delivery status notification') ||
subjectLower.includes('thanks for your submission')
) {
modifiedBytes = replaceHeader(
modifiedBytes,
'Subject',
`Delivery Status: ${newFrom}`,
);
}
log(`✓ Rewritten FROM: ${newFrom}`, 'SUCCESS', workerName);
return {
rawBytes: modifiedBytes,
modified: true,
isBounce: true,
fromAddr: newFrom,
};
}
}
// ---------------------------------------------------------------------------
// Raw header manipulation helpers
// ---------------------------------------------------------------------------
/**
* Replace a header value in raw MIME bytes.
* Handles multi-line (folded) headers.
*/
function replaceHeader(raw: Buffer, name: string, newValue: string): Buffer {
const str = raw.toString('utf-8');
// Match header including potential folded continuation lines
const regex = new RegExp(
`^(${escapeRegex(name)}:\\s*).*?(\\r?\\n(?=[^ \\t])|\\r?\\n$)`,
'im',
);
// Also need to consume folded lines
const foldedRegex = new RegExp(
`^${escapeRegex(name)}:[ \\t]*[^\\r\\n]*(?:\\r?\\n[ \\t]+[^\\r\\n]*)*`,
'im',
);
const match = foldedRegex.exec(str);
if (!match) return raw;
const before = str.slice(0, match.index);
const after = str.slice(match.index + match[0].length);
const replaced = `${before}${name}: ${newValue}${after}`;
return Buffer.from(replaced, 'utf-8');
}
/**
* Add a new header line right before the header/body separator.
*/
function addHeader(raw: Buffer, name: string, value: string): Buffer {
const str = raw.toString('utf-8');
// Find the header/body boundary (first blank line)
const sep = str.match(/\r?\n\r?\n/);
if (!sep || sep.index === undefined) return raw;
const before = str.slice(0, sep.index);
const after = str.slice(sep.index);
return Buffer.from(`${before}\r\n${name}: ${value}${after}`, 'utf-8');
}
function escapeRegex(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}

View File

@@ -0,0 +1,120 @@
/**
* Email parsing utilities
*
* Wraps `mailparser` for parsing raw MIME bytes and provides
* header sanitization (e.g. Microsoft's malformed Message-IDs).
*/
import { simpleParser, type ParsedMail } from 'mailparser';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface BodyParts {
text: string;
html: string | null;
}
// ---------------------------------------------------------------------------
// Parser
// ---------------------------------------------------------------------------
/**
* Parse raw email bytes into a ParsedMail object.
* Applies pre-sanitization for known malformed headers before parsing.
*/
export async function parseEmail(raw: Buffer): Promise<ParsedMail> {
// Pre-sanitize: fix Microsoft's [uuid]@domain Message-IDs
const sanitized = sanitizeRawHeaders(raw);
return simpleParser(sanitized);
}
/**
* Extract text and HTML body parts from a parsed email.
*/
export function extractBodyParts(parsed: ParsedMail): BodyParts {
const text = parsed.text?.trim() || '(No body content)';
const html = parsed.html || null;
return { text, html };
}
/**
* Check if email was already processed by our worker (loop detection).
*/
export function isProcessedByWorker(parsed: ParsedMail): boolean {
const headers = parsed.headers;
const xWorker = headers.get('x-ses-worker-processed');
const autoSubmitted = headers.get('auto-submitted');
const isProcessedByUs = !!xWorker;
const isOurAutoReply = autoSubmitted === 'auto-replied' && !!xWorker;
return isProcessedByUs || isOurAutoReply;
}
/**
* Check if email is a SES MAILER-DAEMON bounce notification.
*/
export function isSesBounceNotification(parsed: ParsedMail): boolean {
const from = (parsed.from?.text ?? '').toLowerCase();
return from.includes('mailer-daemon@') && from.includes('amazonses.com');
}
/**
* Get a header value as string. Handles mailparser's headerlines Map.
*/
export function getHeader(parsed: ParsedMail, name: string): string {
const val = parsed.headers.get(name.toLowerCase());
if (val === undefined || val === null) return '';
if (typeof val === 'string') return val;
if (typeof val === 'object' && 'text' in val) return (val as any).text ?? '';
return String(val);
}
// ---------------------------------------------------------------------------
// Raw header sanitization
// ---------------------------------------------------------------------------
/**
* Fix known problematic patterns in raw MIME headers BEFORE parsing.
*
* Specifically targets Microsoft's `Message-ID: <[uuid]@domain>` which
* causes strict parsers to crash.
*/
function sanitizeRawHeaders(raw: Buffer): Buffer {
// We only need to check/fix the header section (before first blank line).
// For efficiency we work on the first ~8KB where headers live.
const headerEnd = findDoubleNewline(raw);
const headerLen = headerEnd === -1 ? Math.min(raw.length, 8192) : headerEnd;
const headerStr = raw.subarray(0, headerLen).toString('utf-8');
// Fix: Message-ID with square brackets <[...]@...>
if (headerStr.includes('[') || headerStr.includes(']')) {
const fixed = headerStr.replace(
/^(Message-ID:\s*<?)(\[.*?\])(@[^>]*>?\s*)$/im,
(_match, prefix, bracketed, suffix) =>
prefix + bracketed.replace(/\[/g, '').replace(/\]/g, '') + suffix,
);
if (fixed !== headerStr) {
return Buffer.concat([
Buffer.from(fixed, 'utf-8'),
raw.subarray(headerLen),
]);
}
}
return raw;
}
function findDoubleNewline(buf: Buffer): number {
// Look for \r\n\r\n or \n\n
for (let i = 0; i < buf.length - 3; i++) {
if (buf[i] === 0x0d && buf[i + 1] === 0x0a && buf[i + 2] === 0x0d && buf[i + 3] === 0x0a) {
return i;
}
if (buf[i] === 0x0a && buf[i + 1] === 0x0a) {
return i;
}
}
return -1;
}

View File

@@ -0,0 +1,306 @@
/**
* Email rules processing (Auto-Reply / OOO and Forwarding)
* * CLEANED UP & FIXED:
* - Uses MailComposer for ALL message generation (safer MIME handling)
* - Fixes broken attachment forwarding
* - Removed legacy SMTP forwarding
* - Removed manual string concatenation for MIME boundaries
*/
import { createTransport } from 'nodemailer';
import type { ParsedMail } from 'mailparser';
import type { SESHandler } from '../aws/ses.js';
import { extractBodyParts } from './parser.js';
import { log } from '../logger.js';
// Wir nutzen MailComposer direkt für das Erstellen der Raw Bytes
import MailComposer from 'nodemailer/lib/mail-composer/index.js';
import { DynamoDBHandler, EmailRule } from '../aws/dynamodb.js';
import { config, isInternalAddress } from '../config.js';
export type MetricsCallback = (action: 'autoreply' | 'forward', domain: string) => void;
export class RulesProcessor {
constructor(
private dynamodb: DynamoDBHandler,
private ses: SESHandler,
) {}
/**
* Process OOO and Forward rules for a single recipient.
*/
async processRulesForRecipient(
recipient: string,
parsed: ParsedMail,
rawBytes: Buffer,
domain: string,
workerName: string,
metricsCallback?: MetricsCallback,
): Promise<boolean> {
const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase());
if (!rule) return false;
const originalFrom = parsed.from?.text ?? '';
const senderAddr = extractSenderAddress(originalFrom);
// OOO / Auto-Reply
if (rule.ooo_active) {
await this.handleOoo(
recipient,
parsed,
senderAddr,
rule,
domain,
workerName,
metricsCallback,
);
}
// Forwarding
const forwards = rule.forwards ?? [];
if (forwards.length > 0) {
await this.handleForwards(
recipient,
parsed,
originalFrom,
forwards,
domain,
workerName,
metricsCallback,
);
}
return false; // never skip local delivery
}
// -----------------------------------------------------------------------
// OOO
// -----------------------------------------------------------------------
private async handleOoo(
recipient: string,
parsed: ParsedMail,
senderAddr: string,
rule: EmailRule,
domain: string,
workerName: string,
metricsCallback?: MetricsCallback,
): Promise<void> {
// Don't reply to automatic messages
const autoSubmitted = parsed.headers.get('auto-submitted');
const precedence = String(parsed.headers.get('precedence') ?? '').toLowerCase();
if (autoSubmitted && autoSubmitted !== 'no') {
log(' ⏭ Skipping OOO for auto-submitted message', 'INFO', workerName);
return;
}
if (['bulk', 'junk', 'list'].includes(precedence)) {
log(` ⏭ Skipping OOO for ${precedence} message`, 'INFO', workerName);
return;
}
if (/noreply|no-reply|mailer-daemon/i.test(senderAddr)) {
log(' ⏭ Skipping OOO for noreply address', 'INFO', workerName);
return;
}
try {
const oooMsg = (rule.ooo_message as string) ?? 'I am out of office.';
const contentType = (rule.ooo_content_type as string) ?? 'text';
// FIX: Use MailComposer via await
const oooBuffer = await buildOooReply(parsed, recipient, oooMsg, contentType);
if (isInternalAddress(senderAddr)) {
const ok = await sendInternalEmail(recipient, senderAddr, oooBuffer, workerName);
if (ok) log(`✓ Sent OOO reply internally to ${senderAddr}`, 'SUCCESS', workerName);
else log(`⚠ Internal OOO reply failed to ${senderAddr}`, 'WARNING', workerName);
} else {
const ok = await this.ses.sendRawEmail(recipient, senderAddr, oooBuffer, workerName);
if (ok) log(`✓ Sent OOO reply externally to ${senderAddr} via SES`, 'SUCCESS', workerName);
}
metricsCallback?.('autoreply', domain);
} catch (err: any) {
log(`⚠ OOO reply failed to ${senderAddr}: ${err.message ?? err}`, 'ERROR', workerName);
}
}
// -----------------------------------------------------------------------
// Forwarding
// -----------------------------------------------------------------------
private async handleForwards(
recipient: string,
parsed: ParsedMail,
originalFrom: string,
forwards: string[],
domain: string,
workerName: string,
metricsCallback?: MetricsCallback,
): Promise<void> {
for (const forwardTo of forwards) {
try {
// FIX: Correctly await the composer result
const fwdBuffer = await buildForwardMessage(parsed, recipient, forwardTo, originalFrom);
if (isInternalAddress(forwardTo)) {
const ok = await sendInternalEmail(recipient, forwardTo, fwdBuffer, workerName);
if (ok) log(`✓ Forwarded internally to ${forwardTo}`, 'SUCCESS', workerName);
else log(`⚠ Internal forward failed to ${forwardTo}`, 'WARNING', workerName);
} else {
const ok = await this.ses.sendRawEmail(recipient, forwardTo, fwdBuffer, workerName);
if (ok) log(`✓ Forwarded externally to ${forwardTo} via SES`, 'SUCCESS', workerName);
}
metricsCallback?.('forward', domain);
} catch (err: any) {
log(`⚠ Forward failed to ${forwardTo}: ${err.message ?? err}`, 'ERROR', workerName);
}
}
}
}
// ---------------------------------------------------------------------------
// Message building (Using Nodemailer MailComposer for Safety)
// ---------------------------------------------------------------------------
async function buildOooReply(
original: ParsedMail,
recipient: string,
oooMsg: string,
contentType: string,
): Promise<Buffer> {
const { text: textBody, html: htmlBody } = extractBodyParts(original);
const originalSubject = original.subject ?? '(no subject)';
const originalFrom = original.from?.text ?? 'unknown';
const originalMsgId = original.messageId ?? '';
const recipientDomain = recipient.split('@')[1];
// Text version
let textContent = `${oooMsg}\n\n--- Original Message ---\n`;
textContent += `From: ${originalFrom}\n`;
textContent += `Subject: ${originalSubject}\n\n`;
textContent += textBody;
// HTML version
let htmlContent = `<div>${oooMsg}</div><br><hr><br>`;
htmlContent += '<strong>Original Message</strong><br>';
htmlContent += `<strong>From:</strong> ${originalFrom}<br>`;
htmlContent += `<strong>Subject:</strong> ${originalSubject}<br><br>`;
htmlContent += htmlBody ? htmlBody : textBody.replace(/\n/g, '<br>');
const includeHtml = contentType === 'html' || !!htmlBody;
const composer = new MailComposer({
from: recipient,
to: originalFrom,
subject: `Out of Office: ${originalSubject}`,
inReplyTo: originalMsgId,
references: [originalMsgId], // Nodemailer wants array
text: textContent,
html: includeHtml ? htmlContent : undefined,
headers: {
'Auto-Submitted': 'auto-replied',
'X-SES-Worker-Processed': 'ooo-reply',
},
messageId: `<${Date.now()}.${Math.random().toString(36).slice(2)}@${recipientDomain}>`
});
return composer.compile().build();
}
async function buildForwardMessage(
original: ParsedMail,
recipient: string,
forwardTo: string,
originalFrom: string,
): Promise<Buffer> {
const { text: textBody, html: htmlBody } = extractBodyParts(original);
const originalSubject = original.subject ?? '(no subject)';
const originalDate = original.date?.toUTCString() ?? 'unknown';
// Text version
let fwdText = '---------- Forwarded message ---------\n';
fwdText += `From: ${originalFrom}\n`;
fwdText += `Date: ${originalDate}\n`;
fwdText += `Subject: ${originalSubject}\n`;
fwdText += `To: ${recipient}\n\n`;
fwdText += textBody;
// HTML version
let fwdHtml: string | undefined;
if (htmlBody) {
fwdHtml = "<div style='border-left:3px solid #ccc;padding-left:10px;'>";
fwdHtml += '<strong>---------- Forwarded message ---------</strong><br>';
fwdHtml += `<strong>From:</strong> ${originalFrom}<br>`;
fwdHtml += `<strong>Date:</strong> ${originalDate}<br>`;
fwdHtml += `<strong>Subject:</strong> ${originalSubject}<br>`;
fwdHtml += `<strong>To:</strong> ${recipient}<br><br>`;
fwdHtml += htmlBody;
fwdHtml += '</div>';
}
// Config object for MailComposer
const mailOptions: any = {
from: recipient,
to: forwardTo,
subject: `FWD: ${originalSubject}`,
replyTo: originalFrom,
text: fwdText,
html: fwdHtml,
headers: {
'X-SES-Worker-Processed': 'forwarded',
},
};
// Attachments
if (original.attachments && original.attachments.length > 0) {
mailOptions.attachments = original.attachments.map((att) => ({
filename: att.filename ?? 'attachment',
content: att.content,
contentType: att.contentType,
cid: att.cid ?? undefined,
contentDisposition: att.contentDisposition || 'attachment'
}));
}
const composer = new MailComposer(mailOptions);
return composer.compile().build();
}
// ---------------------------------------------------------------------------
// Internal SMTP delivery (port 25, bypasses transport_maps)
// ---------------------------------------------------------------------------
async function sendInternalEmail(
from: string,
to: string,
rawMessage: Buffer,
workerName: string,
): Promise<boolean> {
try {
const transport = createTransport({
host: config.smtpHost,
port: config.internalSmtpPort,
secure: false,
tls: { rejectUnauthorized: false },
});
await transport.sendMail({
envelope: { from, to: [to] },
raw: rawMessage,
});
transport.close();
return true;
} catch (err: any) {
log(` ✗ Internal delivery failed to ${to}: ${err.message ?? err}`, 'ERROR', workerName);
return false;
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function extractSenderAddress(fromHeader: string): string {
const match = fromHeader.match(/<([^>]+)>/);
return match ? match[1] : fromHeader;
}

View File

@@ -0,0 +1,48 @@
/**
* Health check HTTP server
*
* Provides a simple /health endpoint for Docker healthcheck
* and monitoring. Returns domain list and feature flags.
*/
import { createServer, type Server } from 'node:http';
import { log } from './logger.js';
export function startHealthServer(
port: number,
domains: string[],
getStats?: () => any,
): Server {
const server = createServer((_req, res) => {
const stats = getStats?.() ?? {};
const payload = {
status: 'healthy',
worker: 'unified-email-worker-ts',
version: '2.0.0',
domains,
domainCount: domains.length,
features: {
bounce_handling: true,
ooo_replies: true,
forwarding: true,
blocklist: true,
prometheus_metrics: true,
lmtp: false,
legacy_smtp_forward: false,
},
stats,
uptime: process.uptime(),
timestamp: new Date().toISOString(),
};
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(payload, null, 2));
});
server.listen(port, () => {
log(`Health check on port ${port}`);
});
return server;
}

View File

@@ -0,0 +1,150 @@
/**
* Structured logging for email worker with daily rotation AND retention
*
* Uses pino for high-performance JSON logging.
* Includes logic to delete logs older than X days.
*/
import pino from 'pino';
import {
existsSync,
mkdirSync,
createWriteStream,
type WriteStream,
readdirSync,
statSync,
unlinkSync
} from 'node:fs';
import { join } from 'node:path';
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
const LOG_DIR = '/var/log/email-worker';
const LOG_FILE_PREFIX = 'worker';
const RETENTION_DAYS = 14; // Logs älter als 14 Tage löschen
// ---------------------------------------------------------------------------
// File stream & Retention Logic
// ---------------------------------------------------------------------------
let fileStream: WriteStream | null = null;
let currentDateStr = '';
function getDateStr(): string {
return new Date().toISOString().slice(0, 10); // YYYY-MM-DD
}
/**
* Löscht alte Log-Dateien basierend auf RETENTION_DAYS
*/
function cleanUpOldLogs(): void {
try {
if (!existsSync(LOG_DIR)) return;
const files = readdirSync(LOG_DIR);
const now = Date.now();
const maxAgeMs = RETENTION_DAYS * 24 * 60 * 60 * 1000;
for (const file of files) {
// Prüfen ob es eine unserer Log-Dateien ist
if (!file.startsWith(LOG_FILE_PREFIX) || !file.endsWith('.log')) continue;
const filePath = join(LOG_DIR, file);
try {
const stats = statSync(filePath);
const ageMs = now - stats.mtimeMs;
if (ageMs > maxAgeMs) {
unlinkSync(filePath);
// Einmalig auf stdout loggen, damit man sieht, dass aufgeräumt wurde
process.stdout.write(`[INFO] Deleted old log file: ${file}\n`);
}
} catch (err) {
// Ignorieren, falls Datei gerade gelöscht wurde oder Zugriff verweigert
}
}
} catch (err) {
process.stderr.write(`[WARN] Failed to clean up old logs: ${err}\n`);
}
}
function ensureFileStream(): WriteStream | null {
const today = getDateStr();
// Wenn wir bereits einen Stream für heute haben, zurückgeben
if (fileStream && currentDateStr === today) return fileStream;
try {
if (!existsSync(LOG_DIR)) mkdirSync(LOG_DIR, { recursive: true });
// Wenn sich das Datum geändert hat (oder beim ersten Start): Aufräumen
if (currentDateStr !== today) {
cleanUpOldLogs();
}
// Alten Stream schließen, falls vorhanden
if (fileStream) {
fileStream.end();
}
const filePath = join(LOG_DIR, `${LOG_FILE_PREFIX}.${today}.log`);
fileStream = createWriteStream(filePath, { flags: 'a' });
currentDateStr = today;
return fileStream;
} catch {
// Silently continue without file logging (e.g. permission issue)
return null;
}
}
// ---------------------------------------------------------------------------
// Pino logger
// ---------------------------------------------------------------------------
const logger = pino({
level: 'info',
formatters: {
level(label) {
return { level: label };
},
},
timestamp: pino.stdTimeFunctions.isoTime,
});
// ---------------------------------------------------------------------------
// Log level mapping
// ---------------------------------------------------------------------------
type LogLevel = 'DEBUG' | 'INFO' | 'WARNING' | 'ERROR' | 'CRITICAL' | 'SUCCESS';
const LEVEL_MAP: Record<LogLevel, keyof pino.Logger> = {
DEBUG: 'debug',
INFO: 'info',
WARNING: 'warn',
ERROR: 'error',
CRITICAL: 'fatal',
SUCCESS: 'info',
};
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
export function log(
message: string,
level: LogLevel = 'INFO',
workerName = 'unified-worker',
): void {
const prefix = level === 'SUCCESS' ? '[SUCCESS] ' : '';
const formatted = `[${workerName}] ${prefix}${message}`;
// Pino (stdout/json)
const method = LEVEL_MAP[level] ?? 'info';
(logger as any)[method](formatted);
// File (plain text)
const stream = ensureFileStream();
if (stream) {
const ts = new Date().toISOString().replace('T', ' ').slice(0, 19);
const line = `[${ts}] [${level}] [${workerName}] ${prefix}${message}\n`;
stream.write(line);
}
}

View File

@@ -0,0 +1,89 @@
/**
* Main entry point for unified email worker
*
* Startup sequence:
* 1. Load configuration and domains
* 2. Start Prometheus metrics server
* 3. Start health check server
* 4. Initialize UnifiedWorker
* 5. Register signal handlers for graceful shutdown
*/
import { config, loadDomains } from './config.js';
import { log } from './logger.js';
import { startMetricsServer, type MetricsCollector } from './metrics.js';
import { startHealthServer } from './health.js';
import { UnifiedWorker } from './worker/unified-worker.js';
// ---------------------------------------------------------------------------
// Banner
// ---------------------------------------------------------------------------
function printBanner(domains: string[]): void {
log('╔══════════════════════════════════════════════════╗');
log('║ Unified Email Worker (TypeScript) ║');
log('║ Version 2.0.0 ║');
log('╚══════════════════════════════════════════════════╝');
log('');
log(`Domains (${domains.length}):`);
for (const d of domains) {
log(`${d}`);
}
log('');
log(`SMTP: ${config.smtpHost}:${config.smtpPort}`);
log(`Internal SMTP: port ${config.internalSmtpPort}`);
log(`Poll interval: ${config.pollInterval}s`);
log(`Metrics: port ${config.metricsPort}`);
log(`Health: port ${config.healthPort}`);
log('');
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main(): Promise<void> {
// 1. Load domains
const domains = loadDomains();
if (domains.length === 0) {
log('❌ No domains configured. Set DOMAINS env var or provide DOMAINS_FILE.', 'ERROR');
process.exit(1);
}
printBanner(domains);
// 2. Metrics server
const metrics: MetricsCollector | null = await startMetricsServer(config.metricsPort);
// 3. Unified worker
const worker = new UnifiedWorker(domains, metrics);
// 4. Health server
startHealthServer(config.healthPort, domains, () => worker.getStats());
// 5. Signal handlers
let shuttingDown = false;
const shutdown = async (signal: string) => {
if (shuttingDown) return;
shuttingDown = true;
log(`\n🛑 Received ${signal}. Shutting down gracefully...`);
await worker.stop();
log('👋 Goodbye.');
process.exit(0);
};
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
// 6. Start
await worker.start();
// Keep alive (event loop stays open due to HTTP servers + SQS polling)
log('✅ Worker is running. Press Ctrl+C to stop.');
}
// ---------------------------------------------------------------------------
main().catch((err) => {
log(`💥 Fatal startup error: ${err.message ?? err}`, 'CRITICAL');
log(err.stack ?? '', 'CRITICAL');
process.exit(1);
});

View File

@@ -0,0 +1,155 @@
/**
* Prometheus metrics collection
*
* Uses prom-client. Falls back gracefully if not available.
*/
import { log } from './logger.js';
import type * as PromClientTypes from 'prom-client';
// prom-client is optional — import dynamically
let promClient: typeof PromClientTypes | null = null;
try {
promClient = require('prom-client') as typeof PromClientTypes;
} catch {
// not installed
}
// ---------------------------------------------------------------------------
// Metric instances (created lazily if prom-client is available)
// ---------------------------------------------------------------------------
let emailsProcessed: any;
let emailsInFlight: any;
let processingTime: any;
let queueSize: any;
let bouncesProcessed: any;
let autorepliesSent: any;
let forwardsSent: any;
let blockedSenders: any;
function initMetrics(): void {
if (!promClient) return;
const { Counter, Gauge, Histogram } = promClient;
emailsProcessed = new Counter({
name: 'emails_processed_total',
help: 'Total emails processed',
labelNames: ['domain', 'status'],
});
emailsInFlight = new Gauge({
name: 'emails_in_flight',
help: 'Emails currently being processed',
});
processingTime = new Histogram({
name: 'email_processing_seconds',
help: 'Time to process email',
labelNames: ['domain'],
});
queueSize = new Gauge({
name: 'queue_messages_available',
help: 'Messages in queue',
labelNames: ['domain'],
});
bouncesProcessed = new Counter({
name: 'bounces_processed_total',
help: 'Bounce notifications processed',
labelNames: ['domain', 'type'],
});
autorepliesSent = new Counter({
name: 'autoreplies_sent_total',
help: 'Auto-replies sent',
labelNames: ['domain'],
});
forwardsSent = new Counter({
name: 'forwards_sent_total',
help: 'Forwards sent',
labelNames: ['domain'],
});
blockedSenders = new Counter({
name: 'blocked_senders_total',
help: 'Emails blocked by blacklist',
labelNames: ['domain'],
});
}
// ---------------------------------------------------------------------------
// MetricsCollector
// ---------------------------------------------------------------------------
export class MetricsCollector {
public readonly enabled: boolean;
constructor() {
this.enabled = !!promClient;
if (this.enabled) initMetrics();
}
incrementProcessed(domain: string, status: string): void {
emailsProcessed?.labels(domain, status).inc();
}
incrementInFlight(): void {
emailsInFlight?.inc();
}
decrementInFlight(): void {
emailsInFlight?.dec();
}
observeProcessingTime(domain: string, seconds: number): void {
processingTime?.labels(domain).observe(seconds);
}
setQueueSize(domain: string, size: number): void {
queueSize?.labels(domain).set(size);
}
incrementBounce(domain: string, bounceType: string): void {
bouncesProcessed?.labels(domain, bounceType).inc();
}
incrementAutoreply(domain: string): void {
autorepliesSent?.labels(domain).inc();
}
incrementForward(domain: string): void {
forwardsSent?.labels(domain).inc();
}
incrementBlocked(domain: string): void {
blockedSenders?.labels(domain).inc();
}
}
// ---------------------------------------------------------------------------
// Start metrics HTTP server
// ---------------------------------------------------------------------------
export async function startMetricsServer(port: number): Promise<MetricsCollector | null> {
if (!promClient) {
log('⚠ Prometheus client not installed, metrics disabled', 'WARNING');
return null;
}
try {
const { createServer } = await import('node:http');
const { register } = promClient;
const server = createServer(async (_req, res) => {
try {
res.setHeader('Content-Type', register.contentType);
res.end(await register.metrics());
} catch {
res.statusCode = 500;
res.end();
}
});
server.listen(port, () => {
log(`Prometheus metrics on port ${port}`);
});
return new MetricsCollector();
} catch (err: any) {
log(`Failed to start metrics server: ${err.message ?? err}`, 'ERROR');
return null;
}
}

View File

@@ -0,0 +1,155 @@
/**
* SMTP / email delivery with nodemailer pooled transport
*
* Replaces both Python's SMTPPool and EmailDelivery classes.
* nodemailer handles connection pooling, keepalive, and reconnection natively.
*
* Removed: LMTP delivery path (never used in production).
*/
import { createTransport, type Transporter } from 'nodemailer';
import { log } from '../logger.js';
import { config } from '../config.js';
// ---------------------------------------------------------------------------
// Permanent error detection
// ---------------------------------------------------------------------------
const PERMANENT_INDICATORS = [
'550', '551', '553',
'mailbox not found', 'user unknown', 'no such user',
'recipient rejected', 'does not exist', 'invalid recipient',
'unknown user',
];
function isPermanentRecipientError(errorMsg: string): boolean {
const lower = errorMsg.toLowerCase();
return PERMANENT_INDICATORS.some((ind) => lower.includes(ind));
}
// ---------------------------------------------------------------------------
// Delivery class
// ---------------------------------------------------------------------------
export class EmailDelivery {
private transport: Transporter;
constructor() {
this.transport = createTransport({
host: config.smtpHost,
port: config.smtpPort,
secure: config.smtpUseTls,
pool: true,
maxConnections: config.smtpPoolSize,
maxMessages: Infinity, // reuse connections indefinitely
tls: { rejectUnauthorized: false },
...(config.smtpUser && config.smtpPass
? { auth: { user: config.smtpUser, pass: config.smtpPass } }
: {}),
});
log(
`📡 SMTP pool initialized → ${config.smtpHost}:${config.smtpPort} ` +
`(max ${config.smtpPoolSize} connections)`,
);
}
/**
* Send raw email to ONE recipient via the local DMS.
*
* Returns: [success, errorMessage?, isPermanent]
*/
async sendToRecipient(
fromAddr: string,
recipient: string,
rawMessage: Buffer,
workerName: string,
maxRetries = 2,
): Promise<[boolean, string | null, boolean]> {
let lastError: string | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await this.transport.sendMail({
envelope: { from: fromAddr, to: [recipient] },
raw: rawMessage,
});
log(`${recipient}: Delivered (SMTP)`, 'SUCCESS', workerName);
return [true, null, false];
} catch (err: any) {
const errorMsg = err.message ?? String(err);
const responseCode = err.responseCode ?? 0;
// Check for permanent errors (5xx)
if (
responseCode >= 550 ||
isPermanentRecipientError(errorMsg)
) {
log(
`${recipient}: ${errorMsg} (permanent)`,
'ERROR',
workerName,
);
return [false, errorMsg, true];
}
// Connection-level errors → retry
if (
err.code === 'ECONNRESET' ||
err.code === 'ECONNREFUSED' ||
err.code === 'ETIMEDOUT' ||
errorMsg.toLowerCase().includes('disconnect') ||
errorMsg.toLowerCase().includes('closed') ||
errorMsg.toLowerCase().includes('connection')
) {
log(
`${recipient}: Connection error, retrying... ` +
`(attempt ${attempt + 1}/${maxRetries + 1})`,
'WARNING',
workerName,
);
lastError = errorMsg;
await sleep(300);
continue;
}
// Other SMTP errors
const isPerm = isPermanentRecipientError(errorMsg);
log(
`${recipient}: ${errorMsg} (${isPerm ? 'permanent' : 'temporary'})`,
'ERROR',
workerName,
);
return [false, errorMsg, isPerm];
}
}
// All retries exhausted
log(
`${recipient}: All retries failed - ${lastError}`,
'ERROR',
workerName,
);
return [false, lastError ?? 'Connection failed after retries', false];
}
/** Verify the transport is reachable (used during startup). */
async verify(): Promise<boolean> {
try {
await this.transport.verify();
return true;
} catch {
return false;
}
}
/** Close all pooled connections. */
close(): void {
this.transport.close();
}
}
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}

View File

@@ -0,0 +1,151 @@
/**
* Domain queue poller
*
* One poller per domain. Runs an async loop that long-polls SQS
* and dispatches messages to the MessageProcessor.
*
* Replaces Python's threading.Thread + threading.Event with
* a simple async loop + AbortController for cancellation.
*/
import type { SQSHandler } from '../aws/sqs.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
import { MessageProcessor } from './message-processor.js';
export interface DomainPollerStats {
domain: string;
processed: number;
errors: number;
lastActivity: Date | null;
running: boolean;
}
export class DomainPoller {
public stats: DomainPollerStats;
private abort: AbortController;
private loopPromise: Promise<void> | null = null;
constructor(
private domain: string,
private queueUrl: string,
private sqs: SQSHandler,
private processor: MessageProcessor,
private metrics: MetricsCollector | null,
) {
this.abort = new AbortController();
this.stats = {
domain,
processed: 0,
errors: 0,
lastActivity: null,
running: false,
};
}
/** Start the polling loop. Returns immediately. */
start(): void {
if (this.stats.running) return;
this.stats.running = true;
log(`▶ Started poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
this.loopPromise = this.pollLoop();
}
/** Signal the poller to stop and wait for it to finish. */
async stop(): Promise<void> {
if (!this.stats.running) return;
this.abort.abort();
if (this.loopPromise) {
await this.loopPromise;
}
this.stats.running = false;
log(`⏹ Stopped poller for ${this.domain}`, 'INFO', `poller-${this.domain}`);
}
// -----------------------------------------------------------------------
// Poll loop
// -----------------------------------------------------------------------
private async pollLoop(): Promise<void> {
const workerName = `poller-${this.domain}`;
while (!this.abort.signal.aborted) {
try {
// Report queue size
const qSize = await this.sqs.getQueueSize(this.queueUrl);
this.metrics?.setQueueSize(this.domain, qSize);
if (qSize > 0) {
log(`📊 Queue ${this.domain}: ~${qSize} messages`, 'INFO', workerName);
}
// Long-poll
const messages = await this.sqs.receiveMessages(this.queueUrl);
if (this.abort.signal.aborted) break;
if (messages.length === 0) continue;
log(
`📬 Received ${messages.length} message(s) for ${this.domain}`,
'INFO',
workerName,
);
// Process each message
for (const msg of messages) {
if (this.abort.signal.aborted) break;
const receiveCount = parseInt(
msg.Attributes?.ApproximateReceiveCount ?? '1',
10,
);
this.metrics?.incrementInFlight();
const start = Date.now();
try {
const shouldDelete = await this.processor.processMessage(
this.domain,
msg,
receiveCount,
);
if (shouldDelete && msg.ReceiptHandle) {
await this.sqs.deleteMessage(this.queueUrl, msg.ReceiptHandle);
}
this.stats.processed++;
this.stats.lastActivity = new Date();
const elapsed = ((Date.now() - start) / 1000).toFixed(2);
this.metrics?.observeProcessingTime(this.domain, parseFloat(elapsed));
} catch (err: any) {
this.stats.errors++;
log(
`❌ Error processing message: ${err.message ?? err}`,
'ERROR',
workerName,
);
} finally {
this.metrics?.decrementInFlight();
}
}
} catch (err: any) {
if (this.abort.signal.aborted) break;
this.stats.errors++;
log(
`❌ Polling error for ${this.domain}: ${err.message ?? err}`,
'ERROR',
workerName,
);
// Back off on repeated errors
await sleep(5000);
}
}
}
}
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}

View File

@@ -0,0 +1,346 @@
/**
* Email message processing worker
*
* Processes a single SQS message:
* 1. Unpack SNS/SES envelope
* 2. Download raw email from S3
* 3. Loop detection
* 4. Parse & sanitize headers
* 5. Bounce detection & header rewrite
* 6. Blocklist check
* 7. Process recipients (rules, SMTP delivery)
* 8. Mark result in S3 metadata
*/
import type { Message } from '@aws-sdk/client-sqs';
import type { S3Handler } from '../aws/s3.js';
import type { SQSHandler } from '../aws/sqs.js';
import type { SESHandler } from '../aws/ses.js';
import type { DynamoDBHandler } from '../aws/dynamodb.js';
import type { EmailDelivery } from '../smtp/delivery.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
import { BlocklistChecker } from '../email/blocklist.js';
import { BounceHandler } from '../email/bounce-handler.js';
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
import { RulesProcessor } from '../email/rules-processor.js';
// ---------------------------------------------------------------------------
// Processor
// ---------------------------------------------------------------------------
export class MessageProcessor {
private bounceHandler: BounceHandler;
private rulesProcessor: RulesProcessor;
private blocklist: BlocklistChecker;
public metrics: MetricsCollector | null = null;
constructor(
private s3: S3Handler,
private sqs: SQSHandler,
private ses: SESHandler,
private dynamodb: DynamoDBHandler,
private delivery: EmailDelivery,
) {
this.bounceHandler = new BounceHandler(dynamodb);
this.rulesProcessor = new RulesProcessor(dynamodb, ses);
this.blocklist = new BlocklistChecker(dynamodb);
}
/**
* Process one email message from queue.
* Returns true → delete from queue.
* Returns false → leave in queue for retry.
*/
async processMessage(
domain: string,
message: Message,
receiveCount: number,
): Promise<boolean> {
const workerName = `worker-${domain}`;
try {
// 1. UNPACK (SNS → SES)
const body = JSON.parse(message.Body ?? '{}');
let sesMsg: any;
if (body.Message && body.Type) {
// SNS Notification wrapper
const snsContent = body.Message;
sesMsg = typeof snsContent === 'string' ? JSON.parse(snsContent) : snsContent;
} else {
sesMsg = body;
}
// 2. EXTRACT DATA
const mail = sesMsg.mail ?? {};
const receipt = sesMsg.receipt ?? {};
const messageId: string | undefined = mail.messageId;
// Ignore SES setup notifications
if (messageId === 'AMAZON_SES_SETUP_NOTIFICATION') {
log(' Received Amazon SES Setup Notification. Ignoring.', 'INFO', workerName);
return true;
}
const fromAddr: string = mail.source ?? '';
const recipients: string[] = receipt.recipients ?? [];
if (!messageId) {
log('❌ Error: No messageId in event payload', 'ERROR', workerName);
return true;
}
// Domain validation
if (recipients.length === 0) {
log('⚠ Warning: No recipients in event', 'WARNING', workerName);
return true;
}
const recipientDomain = recipients[0].split('@')[1];
if (recipientDomain.toLowerCase() !== domain.toLowerCase()) {
log(
`⚠ Security: Ignored message for ${recipientDomain} ` +
`(I am worker for ${domain})`,
'WARNING',
workerName,
);
return true;
}
// Compact log
const recipientsStr =
recipients.length === 1
? recipients[0]
: `${recipients.length} recipients`;
log(
`📧 Processing: ${messageId.slice(0, 20)}... -> ${recipientsStr}`,
'INFO',
workerName,
);
// 3. DOWNLOAD FROM S3
const rawBytes = await this.s3.getEmail(domain, messageId, receiveCount);
if (rawBytes === null) return false; // retry later
// 4. LOOP DETECTION
const tempParsed = await parseEmail(rawBytes);
const skipRules = isProcessedByWorker(tempParsed);
if (skipRules) {
log('🔄 Loop prevention: Already processed by worker', 'INFO', workerName);
}
// 5. PARSING & BOUNCE LOGIC
let finalRawBytes = rawBytes;
let fromAddrFinal = fromAddr;
let isBounce = false;
try {
const parsed = await parseEmail(rawBytes);
const subject = parsed.subject ?? '(no subject)';
// Bounce header rewriting
const bounceResult = await this.bounceHandler.applyBounceLogic(
parsed,
rawBytes,
subject,
workerName,
);
isBounce = bounceResult.isBounce;
finalRawBytes = bounceResult.rawBytes;
if (bounceResult.modified) {
log(' ✨ Bounce detected & headers rewritten via DynamoDB', 'INFO', workerName);
fromAddrFinal = bounceResult.fromAddr;
this.metrics?.incrementBounce(domain, 'rewritten');
} else {
fromAddrFinal = fromAddr;
}
// Add processing marker for non-processed emails
if (!skipRules) {
finalRawBytes = addProcessedHeader(finalRawBytes);
}
// Re-parse after modifications for rules processing
var parsedFinal = await parseEmail(finalRawBytes);
} catch (err: any) {
log(
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`,
'WARNING',
workerName,
);
log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName);
fromAddrFinal = fromAddr;
isBounce = false;
var parsedFinal = await parseEmail(rawBytes);
}
// 6. BLOCKLIST CHECK
const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders(
recipients,
fromAddrFinal,
workerName,
);
// 7. PROCESS RECIPIENTS
log(`📤 Sending to ${recipients.length} recipient(s)...`, 'INFO', workerName);
const successful: string[] = [];
const failedPermanent: string[] = [];
const failedTemporary: string[] = [];
const blockedRecipients: string[] = [];
for (const recipient of recipients) {
// Blocked?
if (blockedByRecipient[recipient]) {
log(
`🗑 Silently dropping message for ${recipient} (Sender blocked)`,
'INFO',
workerName,
);
blockedRecipients.push(recipient);
this.metrics?.incrementBlocked(domain);
continue;
}
// Process rules (OOO, Forwarding) — not for bounces or already forwarded
if (!isBounce && !skipRules) {
const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => {
if (action === 'autoreply') this.metrics?.incrementAutoreply(dom);
else if (action === 'forward') this.metrics?.incrementForward(dom);
};
await this.rulesProcessor.processRulesForRecipient(
recipient,
parsedFinal,
finalRawBytes,
domain,
workerName,
metricsCallback,
);
}
// SMTP delivery
const [success, error, isPerm] = await this.delivery.sendToRecipient(
fromAddrFinal,
recipient,
finalRawBytes,
workerName,
);
if (success) {
successful.push(recipient);
this.metrics?.incrementProcessed(domain, 'success');
} else if (isPerm) {
failedPermanent.push(recipient);
this.metrics?.incrementProcessed(domain, 'permanent_failure');
} else {
failedTemporary.push(recipient);
this.metrics?.incrementProcessed(domain, 'temporary_failure');
}
}
// 8. RESULT & CLEANUP
const totalHandled =
successful.length + failedPermanent.length + blockedRecipients.length;
if (totalHandled === recipients.length) {
if (blockedRecipients.length === recipients.length) {
// All blocked
try {
await this.s3.markAsBlocked(
domain,
messageId,
blockedRecipients,
fromAddrFinal,
workerName,
);
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
} catch (err: any) {
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
return false;
}
} else if (successful.length > 0) {
await this.s3.markAsProcessed(
domain,
messageId,
workerName,
failedPermanent.length > 0 ? failedPermanent : undefined,
);
} else if (failedPermanent.length > 0) {
await this.s3.markAsAllInvalid(
domain,
messageId,
failedPermanent,
workerName,
);
}
// Summary
const parts: string[] = [];
if (successful.length) parts.push(`${successful.length} OK`);
if (failedPermanent.length) parts.push(`${failedPermanent.length} invalid`);
if (blockedRecipients.length) parts.push(`${blockedRecipients.length} blocked`);
log(`✅ Completed (${parts.join(', ')})`, 'SUCCESS', workerName);
return true;
} else {
// Temporary failures remain
log(
`🔄 Temp failure (${failedTemporary.length} failed), will retry`,
'WARNING',
workerName,
);
return false;
}
} catch (err: any) {
log(`❌ CRITICAL WORKER ERROR: ${err.message ?? err}`, 'ERROR', workerName);
log(err.stack ?? '', 'ERROR', workerName);
return false;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/**
* Add X-SES-Worker-Processed header to raw email bytes using Buffer manipulation.
* More robust and memory efficient than toString().
*/
function addProcessedHeader(raw: Buffer): Buffer {
// Wir suchen nach dem Ende der Header: Double Newline (\r\n\r\n oder \n\n)
let headerEndIndex = -1;
// Effiziente Suche im Buffer
for (let i = 0; i < raw.length - 3; i++) {
// Check für \r\n\r\n
if (raw[i] === 0x0d && raw[i+1] === 0x0a && raw[i+2] === 0x0d && raw[i+3] === 0x0a) {
headerEndIndex = i;
break;
}
// Check für \n\n (Unix style, seltener bei E-Mail aber möglich)
if (raw[i] === 0x0a && raw[i+1] === 0x0a) {
headerEndIndex = i;
break;
}
}
// Falls keine Header-Trennung gefunden wurde (kaputte Mail?), hängen wir es einfach vorne an
if (headerEndIndex === -1) {
const headerLine = Buffer.from('X-SES-Worker-Processed: delivered\r\n', 'utf-8');
return Buffer.concat([headerLine, raw]);
}
// Wir fügen den Header VOR der leeren Zeile ein
const before = raw.subarray(0, headerEndIndex);
const after = raw.subarray(headerEndIndex);
const newHeader = Buffer.from('\r\nX-SES-Worker-Processed: delivered', 'utf-8');
return Buffer.concat([before, newHeader, after]);
}

View File

@@ -0,0 +1,106 @@
/**
* Unified multi-domain worker coordinator
*
* Manages the lifecycle of all DomainPoller instances:
* - Resolves SQS queue URLs for each domain
* - Creates pollers for valid domains
* - Provides aggregate stats
* - Graceful shutdown
*/
import { DynamoDBHandler } from '../aws/dynamodb';
import { S3Handler} from '../aws/s3.js';
import { SQSHandler} from '../aws/sqs.js'
import { SESHandler } from '../aws/ses';
import { EmailDelivery } from '../smtp/delivery.js';
import { MessageProcessor } from './message-processor.js';
import { DomainPoller, type DomainPollerStats } from './domain-poller.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
export class UnifiedWorker {
private pollers: DomainPoller[] = [];
private processor: MessageProcessor;
private sqs: SQSHandler;
constructor(
private domains: string[],
private metrics: MetricsCollector | null,
) {
const s3 = new S3Handler();
this.sqs = new SQSHandler();
const ses = new SESHandler();
const dynamodb = new DynamoDBHandler();
const delivery = new EmailDelivery();
this.processor = new MessageProcessor(s3, this.sqs, ses, dynamodb, delivery);
this.processor.metrics = metrics;
dynamodb.verifyTables().catch(() => {});
}
async start(): Promise<void> {
log(`🚀 Starting unified worker for ${this.domains.length} domain(s)...`);
const resolvedPollers: DomainPoller[] = [];
for (const domain of this.domains) {
const queueUrl = await this.sqs.getQueueUrl(domain);
if (!queueUrl) {
log(`⚠ Skipping ${domain}: No SQS queue found`, 'WARNING');
continue;
}
const poller = new DomainPoller(
domain,
queueUrl,
this.sqs,
this.processor,
this.metrics,
);
resolvedPollers.push(poller);
}
if (resolvedPollers.length === 0) {
log('❌ No valid domains with SQS queues found. Exiting.', 'ERROR');
process.exit(1);
}
this.pollers = resolvedPollers;
for (const poller of this.pollers) {
poller.start();
}
log(
`✅ All ${this.pollers.length} domain poller(s) running: ` +
this.pollers.map((p) => p.stats.domain).join(', '),
'SUCCESS',
);
}
async stop(): Promise<void> {
log('🛑 Stopping all domain pollers...');
await Promise.all(this.pollers.map((p) => p.stop()));
log('✅ All pollers stopped.');
}
getStats(): {
totalProcessed: number;
totalErrors: number;
domains: DomainPollerStats[];
} {
let totalProcessed = 0;
let totalErrors = 0;
const domains: DomainPollerStats[] = [];
for (const p of this.pollers) {
totalProcessed += p.stats.processed;
totalErrors += p.stats.errors;
domains.push({ ...p.stats });
}
return { totalProcessed, totalErrors, domains };
}
}