Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9862689c0c | |||
| bed6c2a398 | |||
| 27c2be664a |
@@ -8,24 +8,38 @@ from botocore.exceptions import ClientError
|
|||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
|
|
||||||
# Logging konfigurieren
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
sqs = boto3.client('sqs')
|
sqs = boto3.client('sqs')
|
||||||
|
sns = boto3.client('sns')
|
||||||
|
sts_account_id = None
|
||||||
|
|
||||||
# Retry-Konfiguration
|
|
||||||
MAX_RETRIES = 3
|
MAX_RETRIES = 3
|
||||||
BASE_BACKOFF = 1 # Sekunden
|
BASE_BACKOFF = 1
|
||||||
|
|
||||||
def exponential_backoff(attempt):
|
def exponential_backoff(attempt):
|
||||||
"""Exponential Backoff mit Jitter"""
|
|
||||||
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
|
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
|
||||||
|
|
||||||
|
def get_account_id():
|
||||||
|
global sts_account_id
|
||||||
|
if sts_account_id is None:
|
||||||
|
sts_account_id = boto3.client('sts').get_caller_identity()['Account']
|
||||||
|
return sts_account_id
|
||||||
|
|
||||||
|
def get_topic_arn(domain):
|
||||||
|
"""
|
||||||
|
Generiert Topic-ARN aus Domain.
|
||||||
|
Konvention: domain.tld -> domain-tld-topic
|
||||||
|
"""
|
||||||
|
topic_name = domain.replace('.', '-') + '-topic'
|
||||||
|
region = os.environ.get('AWS_REGION', 'us-east-2')
|
||||||
|
account_id = get_account_id()
|
||||||
|
return f"arn:aws:sns:{region}:{account_id}:{topic_name}"
|
||||||
|
|
||||||
def get_queue_url(domain):
|
def get_queue_url(domain):
|
||||||
"""
|
"""
|
||||||
Generiert Queue-Namen aus Domain und holt URL.
|
Fallback: Direkter SQS-Send für Domains ohne SNS-Topic.
|
||||||
Konvention: domain.tld -> domain-tld-queue
|
|
||||||
"""
|
"""
|
||||||
queue_name = domain.replace('.', '-') + '-queue'
|
queue_name = domain.replace('.', '-') + '-queue'
|
||||||
try:
|
try:
|
||||||
@@ -38,54 +52,91 @@ def get_queue_url(domain):
|
|||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def publish_to_sns(topic_arn, message_body, msg_id):
|
||||||
|
attempt = 0
|
||||||
|
while attempt < MAX_RETRIES:
|
||||||
|
try:
|
||||||
|
sns.publish(
|
||||||
|
TopicArn=topic_arn,
|
||||||
|
Message=message_body
|
||||||
|
)
|
||||||
|
logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}")
|
||||||
|
return True
|
||||||
|
except ClientError as e:
|
||||||
|
error_code = e.response['Error']['Code']
|
||||||
|
# Fallback auf SQS bei Topic-nicht-gefunden ODER fehlender Berechtigung
|
||||||
|
if error_code in ('NotFound', 'NotFoundException', 'AuthorizationError'):
|
||||||
|
logger.info(f"ℹ️ SNS unavailable for {topic_arn} ({error_code}) — falling back to SQS")
|
||||||
|
return False
|
||||||
|
attempt += 1
|
||||||
|
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")
|
||||||
|
if attempt == MAX_RETRIES:
|
||||||
|
raise
|
||||||
|
time.sleep(exponential_backoff(attempt))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_to_sqs(queue_url, message_body, msg_id):
|
||||||
|
"""Fallback: Direkter SQS-Send (wie bisher)."""
|
||||||
|
attempt = 0
|
||||||
|
while attempt < MAX_RETRIES:
|
||||||
|
try:
|
||||||
|
sqs.send_message(
|
||||||
|
QueueUrl=queue_url,
|
||||||
|
MessageBody=message_body
|
||||||
|
)
|
||||||
|
logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}")
|
||||||
|
return
|
||||||
|
except ClientError as e:
|
||||||
|
attempt += 1
|
||||||
|
error_code = e.response['Error']['Code']
|
||||||
|
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}")
|
||||||
|
if attempt == MAX_RETRIES:
|
||||||
|
raise
|
||||||
|
time.sleep(exponential_backoff(attempt))
|
||||||
|
|
||||||
def lambda_handler(event, context):
|
def lambda_handler(event, context):
|
||||||
"""
|
"""
|
||||||
Nimmt SES Event entgegen, extrahiert Domain dynamisch,
|
Nimmt SES Event entgegen, extrahiert Domain dynamisch.
|
||||||
verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS.
|
Strategie: SNS Publish (Fan-Out an Primary + Standby Queue).
|
||||||
Mit integrierter Retry-Logik für SQS-Send.
|
Fallback: Direkter SQS-Send falls kein SNS-Topic existiert.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
records = event.get('Records', [])
|
records = event.get('Records', [])
|
||||||
logger.info(f"Received event with {len(records)} records.")
|
logger.info(f"Received event with {len(records)} records.")
|
||||||
|
|
||||||
for record in records:
|
for record in records:
|
||||||
ses_data = record.get('ses', {})
|
ses_data = record.get('ses', {})
|
||||||
if not ses_data:
|
if not ses_data:
|
||||||
logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}")
|
logger.warning(f"Invalid SES event: Missing 'ses' in record")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
mail = ses_data.get('mail', {})
|
mail = ses_data.get('mail', {})
|
||||||
receipt = ses_data.get('receipt', {})
|
receipt = ses_data.get('receipt', {})
|
||||||
|
|
||||||
# Domain extrahieren (aus erstem Recipient)
|
|
||||||
recipients = receipt.get('recipients', []) or mail.get('destination', [])
|
recipients = receipt.get('recipients', []) or mail.get('destination', [])
|
||||||
if not recipients:
|
if not recipients:
|
||||||
logger.warning("No recipients in event - skipping")
|
logger.warning("No recipients in event - skipping")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
first_recipient = recipients[0]
|
first_recipient = recipients[0]
|
||||||
domain = first_recipient.split('@')[-1].lower()
|
domain = first_recipient.split('@')[-1].lower()
|
||||||
if not domain:
|
if not domain:
|
||||||
logger.error("Could not extract domain from recipient")
|
logger.error("Could not extract domain from recipient")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Wichtige Metadaten loggen
|
|
||||||
msg_id = mail.get('messageId', 'unknown')
|
msg_id = mail.get('messageId', 'unknown')
|
||||||
source = mail.get('source', 'unknown')
|
source = mail.get('source', 'unknown')
|
||||||
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
|
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
|
||||||
logger.info(f" From: {source}")
|
logger.info(f" From: {source}")
|
||||||
logger.info(f" To: {recipients}")
|
logger.info(f" To: {recipients}")
|
||||||
|
|
||||||
# SES JSON als String serialisieren
|
|
||||||
ses_json_string = json.dumps(ses_data)
|
ses_json_string = json.dumps(ses_data)
|
||||||
|
|
||||||
# Payload Größe loggen und checken (Safeguard)
|
|
||||||
payload_size = len(ses_json_string.encode('utf-8'))
|
payload_size = len(ses_json_string.encode('utf-8'))
|
||||||
logger.info(f" Metadata Payload Size: {payload_size} bytes")
|
logger.info(f" Metadata Payload Size: {payload_size} bytes")
|
||||||
if payload_size > 200000: # Arbitrary Limit < SQS 256KB
|
if payload_size > 200000:
|
||||||
raise ValueError("Payload too large for SQS")
|
raise ValueError("Payload too large")
|
||||||
|
|
||||||
# Fake SNS Payload
|
|
||||||
fake_sns_payload = {
|
fake_sns_payload = {
|
||||||
"Type": "Notification",
|
"Type": "Notification",
|
||||||
"MessageId": str(uuid.uuid4()),
|
"MessageId": str(uuid.uuid4()),
|
||||||
@@ -94,30 +145,20 @@ def lambda_handler(event, context):
|
|||||||
"Message": ses_json_string,
|
"Message": ses_json_string,
|
||||||
"Timestamp": datetime.utcnow().isoformat() + "Z"
|
"Timestamp": datetime.utcnow().isoformat() + "Z"
|
||||||
}
|
}
|
||||||
|
|
||||||
# Queue URL dynamisch holen
|
message_body = json.dumps(fake_sns_payload)
|
||||||
queue_url = get_queue_url(domain)
|
|
||||||
|
# Strategie: SNS zuerst, SQS als Fallback
|
||||||
# SQS Send mit Retries
|
topic_arn = get_topic_arn(domain)
|
||||||
attempt = 0
|
sns_success = publish_to_sns(topic_arn, message_body, msg_id)
|
||||||
while attempt < MAX_RETRIES:
|
|
||||||
try:
|
if not sns_success:
|
||||||
sqs.send_message(
|
# Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher)
|
||||||
QueueUrl=queue_url,
|
queue_url = get_queue_url(domain)
|
||||||
MessageBody=json.dumps(fake_sns_payload)
|
send_to_sqs(queue_url, message_body, msg_id)
|
||||||
)
|
|
||||||
logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}")
|
|
||||||
break
|
|
||||||
except ClientError as e:
|
|
||||||
attempt += 1
|
|
||||||
error_code = e.response['Error']['Code']
|
|
||||||
logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}")
|
|
||||||
if attempt == MAX_RETRIES:
|
|
||||||
raise
|
|
||||||
time.sleep(exponential_backoff(attempt))
|
|
||||||
|
|
||||||
return {'status': 'ok'}
|
return {'status': 'ok'}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
|
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
|
||||||
raise e
|
raise e
|
||||||
@@ -48,6 +48,9 @@ export const config = {
|
|||||||
// Monitoring
|
// Monitoring
|
||||||
metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10),
|
metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10),
|
||||||
healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10),
|
healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10),
|
||||||
|
|
||||||
|
queueSuffix: process.env.QUEUE_SUFFIX ?? '-queue',
|
||||||
|
standbyMode: (process.env.STANDBY_MODE ?? 'false').toLowerCase() === 'true',
|
||||||
} as const;
|
} as const;
|
||||||
|
|
||||||
export type Config = typeof config;
|
export type Config = typeof config;
|
||||||
@@ -106,7 +109,7 @@ export function isInternalAddress(email: string): boolean {
|
|||||||
|
|
||||||
/** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */
|
/** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */
|
||||||
export function domainToQueueName(domain: string): string {
|
export function domainToQueueName(domain: string): string {
|
||||||
return domain.replace(/\./g, '-') + '-queue';
|
return domain.replace(/\./g, '-') + config.queueSuffix;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */
|
/** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ export class RulesProcessor {
|
|||||||
workerName: string,
|
workerName: string,
|
||||||
metricsCallback?: MetricsCallback,
|
metricsCallback?: MetricsCallback,
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
|
if (config.standbyMode) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase());
|
const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase());
|
||||||
if (!rule) return false;
|
if (!rule) return false;
|
||||||
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import { BlocklistChecker } from '../email/blocklist.js';
|
|||||||
import { BounceHandler } from '../email/bounce-handler.js';
|
import { BounceHandler } from '../email/bounce-handler.js';
|
||||||
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
|
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
|
||||||
import { RulesProcessor } from '../email/rules-processor.js';
|
import { RulesProcessor } from '../email/rules-processor.js';
|
||||||
|
import { config } from '../config.js';
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Processor
|
// Processor
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -258,34 +258,40 @@ export class MessageProcessor {
|
|||||||
|
|
||||||
if (totalHandled === recipients.length) {
|
if (totalHandled === recipients.length) {
|
||||||
if (blockedRecipients.length === recipients.length) {
|
if (blockedRecipients.length === recipients.length) {
|
||||||
// All blocked
|
// All blocked — im Standby kein S3 anfassen
|
||||||
try {
|
if (!config.standbyMode) {
|
||||||
await this.s3.markAsBlocked(
|
try {
|
||||||
domain,
|
await this.s3.markAsBlocked(
|
||||||
messageId,
|
domain,
|
||||||
blockedRecipients,
|
messageId,
|
||||||
fromAddrFinal,
|
blockedRecipients,
|
||||||
workerName,
|
fromAddrFinal,
|
||||||
);
|
workerName,
|
||||||
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
|
);
|
||||||
} catch (err: any) {
|
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
|
||||||
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
|
} catch (err: any) {
|
||||||
return false;
|
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (successful.length > 0) {
|
} else if (successful.length > 0) {
|
||||||
await this.s3.markAsProcessed(
|
if (!config.standbyMode) {
|
||||||
domain,
|
await this.s3.markAsProcessed(
|
||||||
messageId,
|
domain,
|
||||||
workerName,
|
messageId,
|
||||||
failedPermanent.length > 0 ? failedPermanent : undefined,
|
workerName,
|
||||||
);
|
failedPermanent.length > 0 ? failedPermanent : undefined,
|
||||||
|
);
|
||||||
|
}
|
||||||
} else if (failedPermanent.length > 0) {
|
} else if (failedPermanent.length > 0) {
|
||||||
await this.s3.markAsAllInvalid(
|
if (!config.standbyMode) {
|
||||||
domain,
|
await this.s3.markAsAllInvalid(
|
||||||
messageId,
|
domain,
|
||||||
failedPermanent,
|
messageId,
|
||||||
workerName,
|
failedPermanent,
|
||||||
);
|
workerName,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Summary
|
// Summary
|
||||||
|
|||||||
Reference in New Issue
Block a user