import json import os import boto3 import uuid import logging from datetime import datetime from botocore.exceptions import ClientError import time import random logger = logging.getLogger() logger.setLevel(logging.INFO) sqs = boto3.client('sqs') sns = boto3.client('sns') sts_account_id = None MAX_RETRIES = 3 BASE_BACKOFF = 1 def exponential_backoff(attempt): return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1) def get_account_id(): global sts_account_id if sts_account_id is None: sts_account_id = boto3.client('sts').get_caller_identity()['Account'] return sts_account_id def get_topic_arn(domain): """ Generiert Topic-ARN aus Domain. Konvention: domain.tld -> domain-tld-topic """ topic_name = domain.replace('.', '-') + '-topic' region = os.environ.get('AWS_REGION', 'us-east-2') account_id = get_account_id() return f"arn:aws:sns:{region}:{account_id}:{topic_name}" def get_queue_url(domain): """ Fallback: Direkter SQS-Send für Domains ohne SNS-Topic. """ queue_name = domain.replace('.', '-') + '-queue' try: response = sqs.get_queue_url(QueueName=queue_name) return response['QueueUrl'] except ClientError as e: if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': logger.error(f"Queue nicht gefunden für Domain: {domain}") raise ValueError(f"Keine Queue für Domain {domain}") else: raise def publish_to_sns(topic_arn, message_body, msg_id): """Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found.""" attempt = 0 while attempt < MAX_RETRIES: try: sns.publish( TopicArn=topic_arn, Message=message_body ) logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}") return True except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'NotFound' or error_code == 'NotFoundException': logger.info(f"ℹ️ SNS Topic not found for {topic_arn} — falling back to SQS") return False attempt += 1 logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}") if attempt == MAX_RETRIES: raise time.sleep(exponential_backoff(attempt)) return False def send_to_sqs(queue_url, message_body, msg_id): """Fallback: Direkter SQS-Send (wie bisher).""" attempt = 0 while attempt < MAX_RETRIES: try: sqs.send_message( QueueUrl=queue_url, MessageBody=message_body ) logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}") return except ClientError as e: attempt += 1 error_code = e.response['Error']['Code'] logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}") if attempt == MAX_RETRIES: raise time.sleep(exponential_backoff(attempt)) def lambda_handler(event, context): """ Nimmt SES Event entgegen, extrahiert Domain dynamisch. Strategie: SNS Publish (Fan-Out an Primary + Standby Queue). Fallback: Direkter SQS-Send falls kein SNS-Topic existiert. """ try: records = event.get('Records', []) logger.info(f"Received event with {len(records)} records.") for record in records: ses_data = record.get('ses', {}) if not ses_data: logger.warning(f"Invalid SES event: Missing 'ses' in record") continue mail = ses_data.get('mail', {}) receipt = ses_data.get('receipt', {}) recipients = receipt.get('recipients', []) or mail.get('destination', []) if not recipients: logger.warning("No recipients in event - skipping") continue first_recipient = recipients[0] domain = first_recipient.split('@')[-1].lower() if not domain: logger.error("Could not extract domain from recipient") continue msg_id = mail.get('messageId', 'unknown') source = mail.get('source', 'unknown') logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}") logger.info(f" From: {source}") logger.info(f" To: {recipients}") ses_json_string = json.dumps(ses_data) payload_size = len(ses_json_string.encode('utf-8')) logger.info(f" Metadata Payload Size: {payload_size} bytes") if payload_size > 200000: raise ValueError("Payload too large") fake_sns_payload = { "Type": "Notification", "MessageId": str(uuid.uuid4()), "TopicArn": "arn:aws:sns:ses-shim:global-topic", "Subject": "Amazon SES Email Receipt Notification", "Message": ses_json_string, "Timestamp": datetime.utcnow().isoformat() + "Z" } message_body = json.dumps(fake_sns_payload) # Strategie: SNS zuerst, SQS als Fallback topic_arn = get_topic_arn(domain) sns_success = publish_to_sns(topic_arn, message_body, msg_id) if not sns_success: # Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher) queue_url = get_queue_url(domain) send_to_sqs(queue_url, message_body, msg_id) return {'status': 'ok'} except Exception as e: logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True) raise e