new structure

This commit is contained in:
2026-03-07 15:16:14 -06:00
parent a70ae78a93
commit d1426afec5
18 changed files with 3233 additions and 33 deletions

View File

@@ -0,0 +1,354 @@
/**
* Email message processing worker
*
* Processes a single SQS message:
* 1. Unpack SNS/SES envelope
* 2. Download raw email from S3
* 3. Loop detection
* 4. Parse & sanitize headers
* 5. Bounce detection & header rewrite
* 6. Blocklist check
* 7. Process recipients (rules, SMTP delivery)
* 8. Mark result in S3 metadata
*/
import type { Message } from '@aws-sdk/client-sqs';
import type { S3Handler } from '../aws/s3.js';
import type { SQSHandler } from '../aws/sqs.js';
import type { SESHandler } from '../aws/ses.js';
import type { DynamoDBHandler } from '../aws/dynamodb.js';
import type { EmailDelivery } from '../smtp/delivery.js';
import type { MetricsCollector } from '../metrics.js';
import { log } from '../logger.js';
import { BlocklistChecker } from '../email/blocklist.js';
import { BounceHandler } from '../email/bounce-handler.js';
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
import { RulesProcessor } from '../email/rules-processor.js';
// ---------------------------------------------------------------------------
// Processor
// ---------------------------------------------------------------------------
export class MessageProcessor {
private bounceHandler: BounceHandler;
private rulesProcessor: RulesProcessor;
private blocklist: BlocklistChecker;
public metrics: MetricsCollector | null = null;
constructor(
private s3: S3Handler,
private sqs: SQSHandler,
private ses: SESHandler,
private dynamodb: DynamoDBHandler,
private delivery: EmailDelivery,
) {
this.bounceHandler = new BounceHandler(dynamodb);
this.rulesProcessor = new RulesProcessor(dynamodb, ses);
this.blocklist = new BlocklistChecker(dynamodb);
}
/**
* Process one email message from queue.
* Returns true → delete from queue.
* Returns false → leave in queue for retry.
*/
async processMessage(
domain: string,
message: Message,
receiveCount: number,
): Promise<boolean> {
const workerName = `worker-${domain}`;
try {
// 1. UNPACK (SNS → SES)
const body = JSON.parse(message.Body ?? '{}');
let sesMsg: any;
if (body.Message && body.Type) {
// SNS Notification wrapper
const snsContent = body.Message;
sesMsg = typeof snsContent === 'string' ? JSON.parse(snsContent) : snsContent;
} else {
sesMsg = body;
}
// 2. EXTRACT DATA
const mail = sesMsg.mail ?? {};
const receipt = sesMsg.receipt ?? {};
const messageId: string | undefined = mail.messageId;
// Ignore SES setup notifications
if (messageId === 'AMAZON_SES_SETUP_NOTIFICATION') {
log(' Received Amazon SES Setup Notification. Ignoring.', 'INFO', workerName);
return true;
}
const fromAddr: string = mail.source ?? '';
const recipients: string[] = receipt.recipients ?? [];
if (!messageId) {
log('❌ Error: No messageId in event payload', 'ERROR', workerName);
return true;
}
// Domain validation
if (recipients.length === 0) {
log('⚠ Warning: No recipients in event', 'WARNING', workerName);
return true;
}
const recipientDomain = recipients[0].split('@')[1];
if (recipientDomain.toLowerCase() !== domain.toLowerCase()) {
log(
`⚠ Security: Ignored message for ${recipientDomain} ` +
`(I am worker for ${domain})`,
'WARNING',
workerName,
);
return true;
}
// Compact log
const recipientsStr =
recipients.length === 1
? recipients[0]
: `${recipients.length} recipients`;
log(
`📧 Processing: ${messageId.slice(0, 20)}... -> ${recipientsStr}`,
'INFO',
workerName,
);
// 3. DOWNLOAD FROM S3
const rawBytes = await this.s3.getEmail(domain, messageId, receiveCount);
if (rawBytes === null) return false; // retry later
// 4. LOOP DETECTION
const tempParsed = await parseEmail(rawBytes);
const skipRules = isProcessedByWorker(tempParsed);
if (skipRules) {
log('🔄 Loop prevention: Already processed by worker', 'INFO', workerName);
}
// 5. PARSING & BOUNCE LOGIC
let finalRawBytes = rawBytes;
let fromAddrFinal = fromAddr;
let isBounce = false;
try {
const parsed = await parseEmail(rawBytes);
const subject = parsed.subject ?? '(no subject)';
// Bounce header rewriting
const bounceResult = await this.bounceHandler.applyBounceLogic(
parsed,
rawBytes,
subject,
workerName,
);
isBounce = bounceResult.isBounce;
finalRawBytes = bounceResult.rawBytes;
if (bounceResult.modified) {
log(' ✨ Bounce detected & headers rewritten via DynamoDB', 'INFO', workerName);
fromAddrFinal = bounceResult.fromAddr;
this.metrics?.incrementBounce(domain, 'rewritten');
} else {
fromAddrFinal = fromAddr;
}
// Add processing marker for non-processed emails
if (!skipRules) {
finalRawBytes = addProcessedHeader(finalRawBytes);
}
// Re-parse after modifications for rules processing
var parsedFinal = await parseEmail(finalRawBytes);
} catch (err: any) {
log(
`⚠ Parsing/Logic Error: ${err.message ?? err}. Sending original.`,
'WARNING',
workerName,
);
log(`Full error: ${err.stack ?? err}`, 'ERROR', workerName);
fromAddrFinal = fromAddr;
isBounce = false;
var parsedFinal = await parseEmail(rawBytes);
}
// 6. BLOCKLIST CHECK
const sendersToCheck: string[] = [];
if (fromAddrFinal) sendersToCheck.push(fromAddrFinal);
const headerFrom = parsedFinal?.from?.text;
if (headerFrom && !sendersToCheck.includes(headerFrom)) {
sendersToCheck.push(headerFrom);
}
const blockedByRecipient = await this.blocklist.batchCheckBlockedSenders(
recipients,
sendersToCheck, // <-- Array übergeben
workerName,
);
// 7. PROCESS RECIPIENTS
log(`📤 Sending to ${recipients.length} recipient(s)...`, 'INFO', workerName);
const successful: string[] = [];
const failedPermanent: string[] = [];
const failedTemporary: string[] = [];
const blockedRecipients: string[] = [];
for (const recipient of recipients) {
// Blocked?
if (blockedByRecipient[recipient]) {
log(
`🗑 Silently dropping message for ${recipient} (Sender blocked)`,
'INFO',
workerName,
);
blockedRecipients.push(recipient);
this.metrics?.incrementBlocked(domain);
continue;
}
// Process rules (OOO, Forwarding) — not for bounces or already forwarded
if (!isBounce && !skipRules) {
const metricsCallback = (action: 'autoreply' | 'forward', dom: string) => {
if (action === 'autoreply') this.metrics?.incrementAutoreply(dom);
else if (action === 'forward') this.metrics?.incrementForward(dom);
};
await this.rulesProcessor.processRulesForRecipient(
recipient,
parsedFinal,
finalRawBytes,
domain,
workerName,
metricsCallback,
);
}
// SMTP delivery
const [success, error, isPerm] = await this.delivery.sendToRecipient(
fromAddrFinal,
recipient,
finalRawBytes,
workerName,
);
if (success) {
successful.push(recipient);
this.metrics?.incrementProcessed(domain, 'success');
} else if (isPerm) {
failedPermanent.push(recipient);
this.metrics?.incrementProcessed(domain, 'permanent_failure');
} else {
failedTemporary.push(recipient);
this.metrics?.incrementProcessed(domain, 'temporary_failure');
}
}
// 8. RESULT & CLEANUP
const totalHandled =
successful.length + failedPermanent.length + blockedRecipients.length;
if (totalHandled === recipients.length) {
if (blockedRecipients.length === recipients.length) {
// All blocked
try {
await this.s3.markAsBlocked(
domain,
messageId,
blockedRecipients,
fromAddrFinal,
workerName,
);
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
} catch (err: any) {
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
return false;
}
} else if (successful.length > 0) {
await this.s3.markAsProcessed(
domain,
messageId,
workerName,
failedPermanent.length > 0 ? failedPermanent : undefined,
);
} else if (failedPermanent.length > 0) {
await this.s3.markAsAllInvalid(
domain,
messageId,
failedPermanent,
workerName,
);
}
// Summary
const parts: string[] = [];
if (successful.length) parts.push(`${successful.length} OK`);
if (failedPermanent.length) parts.push(`${failedPermanent.length} invalid`);
if (blockedRecipients.length) parts.push(`${blockedRecipients.length} blocked`);
log(`✅ Completed (${parts.join(', ')})`, 'SUCCESS', workerName);
return true;
} else {
// Temporary failures remain
log(
`🔄 Temp failure (${failedTemporary.length} failed), will retry`,
'WARNING',
workerName,
);
return false;
}
} catch (err: any) {
log(`❌ CRITICAL WORKER ERROR: ${err.message ?? err}`, 'ERROR', workerName);
log(err.stack ?? '', 'ERROR', workerName);
return false;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/**
* Add X-SES-Worker-Processed header to raw email bytes using Buffer manipulation.
* More robust and memory efficient than toString().
*/
function addProcessedHeader(raw: Buffer): Buffer {
// Wir suchen nach dem Ende der Header: Double Newline (\r\n\r\n oder \n\n)
let headerEndIndex = -1;
// Effiziente Suche im Buffer
for (let i = 0; i < raw.length - 3; i++) {
// Check für \r\n\r\n
if (raw[i] === 0x0d && raw[i+1] === 0x0a && raw[i+2] === 0x0d && raw[i+3] === 0x0a) {
headerEndIndex = i;
break;
}
// Check für \n\n (Unix style, seltener bei E-Mail aber möglich)
if (raw[i] === 0x0a && raw[i+1] === 0x0a) {
headerEndIndex = i;
break;
}
}
// Falls keine Header-Trennung gefunden wurde (kaputte Mail?), hängen wir es einfach vorne an
if (headerEndIndex === -1) {
const headerLine = Buffer.from('X-SES-Worker-Processed: delivered\r\n', 'utf-8');
return Buffer.concat([headerLine, raw]);
}
// Wir fügen den Header VOR der leeren Zeile ein
const before = raw.subarray(0, headerEndIndex);
const after = raw.subarray(headerEndIndex);
const newHeader = Buffer.from('\r\nX-SES-Worker-Processed: delivered', 'utf-8');
return Buffer.concat([before, newHeader, after]);
}