diff --git a/basic_setup/ses_sns_shim_global.py b/basic_setup/ses_sns_shim_global.py index 3eb945f..2bf01f0 100644 --- a/basic_setup/ses_sns_shim_global.py +++ b/basic_setup/ses_sns_shim_global.py @@ -8,24 +8,38 @@ from botocore.exceptions import ClientError import time import random -# Logging konfigurieren logger = logging.getLogger() logger.setLevel(logging.INFO) sqs = boto3.client('sqs') +sns = boto3.client('sns') +sts_account_id = None -# Retry-Konfiguration MAX_RETRIES = 3 -BASE_BACKOFF = 1 # Sekunden +BASE_BACKOFF = 1 def exponential_backoff(attempt): - """Exponential Backoff mit Jitter""" return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1) +def get_account_id(): + global sts_account_id + if sts_account_id is None: + sts_account_id = boto3.client('sts').get_caller_identity()['Account'] + return sts_account_id + +def get_topic_arn(domain): + """ + Generiert Topic-ARN aus Domain. + Konvention: domain.tld -> domain-tld-topic + """ + topic_name = domain.replace('.', '-') + '-topic' + region = os.environ.get('AWS_REGION', 'us-east-2') + account_id = get_account_id() + return f"arn:aws:sns:{region}:{account_id}:{topic_name}" + def get_queue_url(domain): """ - Generiert Queue-Namen aus Domain und holt URL. - Konvention: domain.tld -> domain-tld-queue + Fallback: Direkter SQS-Send für Domains ohne SNS-Topic. """ queue_name = domain.replace('.', '-') + '-queue' try: @@ -38,54 +52,91 @@ def get_queue_url(domain): else: raise +def publish_to_sns(topic_arn, message_body, msg_id): + """Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found.""" + attempt = 0 + while attempt < MAX_RETRIES: + try: + sns.publish( + TopicArn=topic_arn, + Message=message_body + ) + logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}") + return True + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == 'NotFound' or error_code == 'NotFoundException': + logger.info(f"ℹ️ SNS Topic not found for {topic_arn} — falling back to SQS") + return False + attempt += 1 + logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}") + if attempt == MAX_RETRIES: + raise + time.sleep(exponential_backoff(attempt)) + return False + +def send_to_sqs(queue_url, message_body, msg_id): + """Fallback: Direkter SQS-Send (wie bisher).""" + attempt = 0 + while attempt < MAX_RETRIES: + try: + sqs.send_message( + QueueUrl=queue_url, + MessageBody=message_body + ) + logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}") + return + except ClientError as e: + attempt += 1 + error_code = e.response['Error']['Code'] + logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}") + if attempt == MAX_RETRIES: + raise + time.sleep(exponential_backoff(attempt)) + def lambda_handler(event, context): """ - Nimmt SES Event entgegen, extrahiert Domain dynamisch, - verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS. - Mit integrierter Retry-Logik für SQS-Send. + Nimmt SES Event entgegen, extrahiert Domain dynamisch. + Strategie: SNS Publish (Fan-Out an Primary + Standby Queue). + Fallback: Direkter SQS-Send falls kein SNS-Topic existiert. """ try: records = event.get('Records', []) logger.info(f"Received event with {len(records)} records.") - + for record in records: ses_data = record.get('ses', {}) if not ses_data: - logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}") + logger.warning(f"Invalid SES event: Missing 'ses' in record") continue - + mail = ses_data.get('mail', {}) receipt = ses_data.get('receipt', {}) - - # Domain extrahieren (aus erstem Recipient) + recipients = receipt.get('recipients', []) or mail.get('destination', []) if not recipients: logger.warning("No recipients in event - skipping") continue - + first_recipient = recipients[0] domain = first_recipient.split('@')[-1].lower() if not domain: logger.error("Could not extract domain from recipient") continue - - # Wichtige Metadaten loggen + msg_id = mail.get('messageId', 'unknown') source = mail.get('source', 'unknown') logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}") - logger.info(f" From: {source}") - logger.info(f" To: {recipients}") - - # SES JSON als String serialisieren + logger.info(f" From: {source}") + logger.info(f" To: {recipients}") + ses_json_string = json.dumps(ses_data) - - # Payload Größe loggen und checken (Safeguard) + payload_size = len(ses_json_string.encode('utf-8')) - logger.info(f" Metadata Payload Size: {payload_size} bytes") - if payload_size > 200000: # Arbitrary Limit < SQS 256KB - raise ValueError("Payload too large for SQS") - - # Fake SNS Payload + logger.info(f" Metadata Payload Size: {payload_size} bytes") + if payload_size > 200000: + raise ValueError("Payload too large") + fake_sns_payload = { "Type": "Notification", "MessageId": str(uuid.uuid4()), @@ -94,30 +145,20 @@ def lambda_handler(event, context): "Message": ses_json_string, "Timestamp": datetime.utcnow().isoformat() + "Z" } - - # Queue URL dynamisch holen - queue_url = get_queue_url(domain) - - # SQS Send mit Retries - attempt = 0 - while attempt < MAX_RETRIES: - try: - sqs.send_message( - QueueUrl=queue_url, - MessageBody=json.dumps(fake_sns_payload) - ) - logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}") - break - except ClientError as e: - attempt += 1 - error_code = e.response['Error']['Code'] - logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}") - if attempt == MAX_RETRIES: - raise - time.sleep(exponential_backoff(attempt)) - + + message_body = json.dumps(fake_sns_payload) + + # Strategie: SNS zuerst, SQS als Fallback + topic_arn = get_topic_arn(domain) + sns_success = publish_to_sns(topic_arn, message_body, msg_id) + + if not sns_success: + # Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher) + queue_url = get_queue_url(domain) + send_to_sqs(queue_url, message_body, msg_id) + return {'status': 'ok'} - + except Exception as e: logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True) raise e \ No newline at end of file diff --git a/email-worker-nodejs/src/config.ts b/email-worker-nodejs/src/config.ts index e3ff7e2..68ec5b5 100644 --- a/email-worker-nodejs/src/config.ts +++ b/email-worker-nodejs/src/config.ts @@ -48,6 +48,9 @@ export const config = { // Monitoring metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10), healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10), + + queueSuffix: process.env.QUEUE_SUFFIX ?? '-queue', + standbyMode: (process.env.STANDBY_MODE ?? 'false').toLowerCase() === 'true', } as const; export type Config = typeof config; @@ -106,7 +109,7 @@ export function isInternalAddress(email: string): boolean { /** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */ export function domainToQueueName(domain: string): string { - return domain.replace(/\./g, '-') + '-queue'; + return domain.replace(/\./g, '-') + config.queueSuffix; } /** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */ diff --git a/email-worker-nodejs/src/email/rules-processor.ts b/email-worker-nodejs/src/email/rules-processor.ts index 8d40f49..7f517ea 100644 --- a/email-worker-nodejs/src/email/rules-processor.ts +++ b/email-worker-nodejs/src/email/rules-processor.ts @@ -36,6 +36,9 @@ export class RulesProcessor { workerName: string, metricsCallback?: MetricsCallback, ): Promise { + if (config.standbyMode) { + return false; + } const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase()); if (!rule) return false;