164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
import json
|
||
import os
|
||
import boto3
|
||
import uuid
|
||
import logging
|
||
from datetime import datetime
|
||
from botocore.exceptions import ClientError
|
||
import time
|
||
import random
|
||
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.INFO)
|
||
|
||
sqs = boto3.client('sqs')
|
||
sns = boto3.client('sns')
|
||
sts_account_id = None
|
||
|
||
MAX_RETRIES = 3
|
||
BASE_BACKOFF = 1
|
||
|
||
def exponential_backoff(attempt):
|
||
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
|
||
|
||
def get_account_id():
|
||
global sts_account_id
|
||
if sts_account_id is None:
|
||
sts_account_id = boto3.client('sts').get_caller_identity()['Account']
|
||
return sts_account_id
|
||
|
||
def get_topic_arn(domain):
|
||
"""
|
||
Generiert Topic-ARN aus Domain.
|
||
Konvention: domain.tld -> domain-tld-topic
|
||
"""
|
||
topic_name = domain.replace('.', '-') + '-topic'
|
||
region = os.environ.get('AWS_REGION', 'us-east-2')
|
||
account_id = get_account_id()
|
||
return f"arn:aws:sns:{region}:{account_id}:{topic_name}"
|
||
|
||
def get_queue_url(domain):
|
||
"""
|
||
Fallback: Direkter SQS-Send für Domains ohne SNS-Topic.
|
||
"""
|
||
queue_name = domain.replace('.', '-') + '-queue'
|
||
try:
|
||
response = sqs.get_queue_url(QueueName=queue_name)
|
||
return response['QueueUrl']
|
||
except ClientError as e:
|
||
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
||
logger.error(f"Queue nicht gefunden für Domain: {domain}")
|
||
raise ValueError(f"Keine Queue für Domain {domain}")
|
||
else:
|
||
raise
|
||
|
||
def publish_to_sns(topic_arn, message_body, msg_id):
|
||
attempt = 0
|
||
while attempt < MAX_RETRIES:
|
||
try:
|
||
sns.publish(
|
||
TopicArn=topic_arn,
|
||
Message=message_body
|
||
)
|
||
logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}")
|
||
return True
|
||
except ClientError as e:
|
||
error_code = e.response['Error']['Code']
|
||
# Fallback auf SQS bei Topic-nicht-gefunden ODER fehlender Berechtigung
|
||
if error_code in ('NotFound', 'NotFoundException', 'AuthorizationError'):
|
||
logger.info(f"ℹ️ SNS unavailable for {topic_arn} ({error_code}) — falling back to SQS")
|
||
return False
|
||
attempt += 1
|
||
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")
|
||
if attempt == MAX_RETRIES:
|
||
raise
|
||
time.sleep(exponential_backoff(attempt))
|
||
return False
|
||
|
||
def send_to_sqs(queue_url, message_body, msg_id):
|
||
"""Fallback: Direkter SQS-Send (wie bisher)."""
|
||
attempt = 0
|
||
while attempt < MAX_RETRIES:
|
||
try:
|
||
sqs.send_message(
|
||
QueueUrl=queue_url,
|
||
MessageBody=message_body
|
||
)
|
||
logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}")
|
||
return
|
||
except ClientError as e:
|
||
attempt += 1
|
||
error_code = e.response['Error']['Code']
|
||
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}")
|
||
if attempt == MAX_RETRIES:
|
||
raise
|
||
time.sleep(exponential_backoff(attempt))
|
||
|
||
def lambda_handler(event, context):
|
||
"""
|
||
Nimmt SES Event entgegen, extrahiert Domain dynamisch.
|
||
Strategie: SNS Publish (Fan-Out an Primary + Standby Queue).
|
||
Fallback: Direkter SQS-Send falls kein SNS-Topic existiert.
|
||
"""
|
||
try:
|
||
records = event.get('Records', [])
|
||
logger.info(f"Received event with {len(records)} records.")
|
||
|
||
for record in records:
|
||
ses_data = record.get('ses', {})
|
||
if not ses_data:
|
||
logger.warning(f"Invalid SES event: Missing 'ses' in record")
|
||
continue
|
||
|
||
mail = ses_data.get('mail', {})
|
||
receipt = ses_data.get('receipt', {})
|
||
|
||
recipients = receipt.get('recipients', []) or mail.get('destination', [])
|
||
if not recipients:
|
||
logger.warning("No recipients in event - skipping")
|
||
continue
|
||
|
||
first_recipient = recipients[0]
|
||
domain = first_recipient.split('@')[-1].lower()
|
||
if not domain:
|
||
logger.error("Could not extract domain from recipient")
|
||
continue
|
||
|
||
msg_id = mail.get('messageId', 'unknown')
|
||
source = mail.get('source', 'unknown')
|
||
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
|
||
logger.info(f" From: {source}")
|
||
logger.info(f" To: {recipients}")
|
||
|
||
ses_json_string = json.dumps(ses_data)
|
||
|
||
payload_size = len(ses_json_string.encode('utf-8'))
|
||
logger.info(f" Metadata Payload Size: {payload_size} bytes")
|
||
if payload_size > 200000:
|
||
raise ValueError("Payload too large")
|
||
|
||
fake_sns_payload = {
|
||
"Type": "Notification",
|
||
"MessageId": str(uuid.uuid4()),
|
||
"TopicArn": "arn:aws:sns:ses-shim:global-topic",
|
||
"Subject": "Amazon SES Email Receipt Notification",
|
||
"Message": ses_json_string,
|
||
"Timestamp": datetime.utcnow().isoformat() + "Z"
|
||
}
|
||
|
||
message_body = json.dumps(fake_sns_payload)
|
||
|
||
# Strategie: SNS zuerst, SQS als Fallback
|
||
topic_arn = get_topic_arn(domain)
|
||
sns_success = publish_to_sns(topic_arn, message_body, msg_id)
|
||
|
||
if not sns_success:
|
||
# Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher)
|
||
queue_url = get_queue_url(domain)
|
||
send_to_sqs(queue_url, message_body, msg_id)
|
||
|
||
return {'status': 'ok'}
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Critical Error in Lambda Shim: {str(e)}", exc_info=True)
|
||
raise e |