Files
email-amazon/basic_setup/ses_sns_shim_global.py
2026-04-03 17:03:14 -05:00

164 lines
5.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import boto3
import uuid
import logging
from datetime import datetime
from botocore.exceptions import ClientError
import time
import random
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs = boto3.client('sqs')
sns = boto3.client('sns')
sts_account_id = None
MAX_RETRIES = 3
BASE_BACKOFF = 1
def exponential_backoff(attempt):
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
def get_account_id():
global sts_account_id
if sts_account_id is None:
sts_account_id = boto3.client('sts').get_caller_identity()['Account']
return sts_account_id
def get_topic_arn(domain):
"""
Generiert Topic-ARN aus Domain.
Konvention: domain.tld -> domain-tld-topic
"""
topic_name = domain.replace('.', '-') + '-topic'
region = os.environ.get('AWS_REGION', 'us-east-2')
account_id = get_account_id()
return f"arn:aws:sns:{region}:{account_id}:{topic_name}"
def get_queue_url(domain):
"""
Fallback: Direkter SQS-Send für Domains ohne SNS-Topic.
"""
queue_name = domain.replace('.', '-') + '-queue'
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
logger.error(f"Queue nicht gefunden für Domain: {domain}")
raise ValueError(f"Keine Queue für Domain {domain}")
else:
raise
def publish_to_sns(topic_arn, message_body, msg_id):
attempt = 0
while attempt < MAX_RETRIES:
try:
sns.publish(
TopicArn=topic_arn,
Message=message_body
)
logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}")
return True
except ClientError as e:
error_code = e.response['Error']['Code']
# Fallback auf SQS bei Topic-nicht-gefunden ODER fehlender Berechtigung
if error_code in ('NotFound', 'NotFoundException', 'AuthorizationError'):
logger.info(f" SNS unavailable for {topic_arn} ({error_code}) — falling back to SQS")
return False
attempt += 1
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
return False
def send_to_sqs(queue_url, message_body, msg_id):
"""Fallback: Direkter SQS-Send (wie bisher)."""
attempt = 0
while attempt < MAX_RETRIES:
try:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body
)
logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}")
return
except ClientError as e:
attempt += 1
error_code = e.response['Error']['Code']
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
def lambda_handler(event, context):
"""
Nimmt SES Event entgegen, extrahiert Domain dynamisch.
Strategie: SNS Publish (Fan-Out an Primary + Standby Queue).
Fallback: Direkter SQS-Send falls kein SNS-Topic existiert.
"""
try:
records = event.get('Records', [])
logger.info(f"Received event with {len(records)} records.")
for record in records:
ses_data = record.get('ses', {})
if not ses_data:
logger.warning(f"Invalid SES event: Missing 'ses' in record")
continue
mail = ses_data.get('mail', {})
receipt = ses_data.get('receipt', {})
recipients = receipt.get('recipients', []) or mail.get('destination', [])
if not recipients:
logger.warning("No recipients in event - skipping")
continue
first_recipient = recipients[0]
domain = first_recipient.split('@')[-1].lower()
if not domain:
logger.error("Could not extract domain from recipient")
continue
msg_id = mail.get('messageId', 'unknown')
source = mail.get('source', 'unknown')
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
logger.info(f" From: {source}")
logger.info(f" To: {recipients}")
ses_json_string = json.dumps(ses_data)
payload_size = len(ses_json_string.encode('utf-8'))
logger.info(f" Metadata Payload Size: {payload_size} bytes")
if payload_size > 200000:
raise ValueError("Payload too large")
fake_sns_payload = {
"Type": "Notification",
"MessageId": str(uuid.uuid4()),
"TopicArn": "arn:aws:sns:ses-shim:global-topic",
"Subject": "Amazon SES Email Receipt Notification",
"Message": ses_json_string,
"Timestamp": datetime.utcnow().isoformat() + "Z"
}
message_body = json.dumps(fake_sns_payload)
# Strategie: SNS zuerst, SQS als Fallback
topic_arn = get_topic_arn(domain)
sns_success = publish_to_sns(topic_arn, message_body, msg_id)
if not sns_success:
# Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher)
queue_url = get_queue_url(domain)
send_to_sqs(queue_url, message_body, msg_id)
return {'status': 'ok'}
except Exception as e:
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
raise e