@@ -8,24 +8,38 @@ from botocore.exceptions import ClientError
import time
import random
# Logging konfigurieren
logger = logging . getLogger ( )
logger . setLevel ( logging . INFO )
sqs = boto3 . client ( ' sqs ' )
sns = boto3 . client ( ' sns ' )
sts_account_id = None
# Retry-Konfiguration
MAX_RETRIES = 3
BASE_BACKOFF = 1 # Sekunden
BASE_BACKOFF = 1
def exponential_backoff ( attempt ) :
""" Exponential Backoff mit Jitter """
return BASE_BACKOFF * ( 2 * * attempt ) + random . uniform ( 0 , 1 )
def get_account_id ( ) :
global sts_account_id
if sts_account_id is None :
sts_account_id = boto3 . client ( ' sts ' ) . get_caller_identity ( ) [ ' Account ' ]
return sts_account_id
def get_topic_arn ( domain ) :
"""
Generiert Topic-ARN aus Domain.
Konvention: domain.tld -> domain-tld-topic
"""
topic_name = domain . replace ( ' . ' , ' - ' ) + ' -topic '
region = os . environ . get ( ' AWS_REGION ' , ' us-east-2 ' )
account_id = get_account_id ( )
return f " arn:aws:sns: { region } : { account_id } : { topic_name } "
def get_queue_url ( domain ) :
"""
Generiert Queue-Namen aus Domain und holt URL .
Konvention: domain.tld -> domain-tld-queue
Fallback: Direkter SQS-Send für Domains ohne SNS-Topic .
"""
queue_name = domain . replace ( ' . ' , ' - ' ) + ' -queue '
try :
@@ -38,11 +52,53 @@ def get_queue_url(domain):
else :
raise
def publish_to_sns ( topic_arn , message_body , msg_id ) :
""" Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found. """
attempt = 0
while attempt < MAX_RETRIES :
try :
sns . publish (
TopicArn = topic_arn ,
Message = message_body
)
logger . info ( f " ✅ Published { msg_id } to SNS: { topic_arn } " )
return True
except ClientError as e :
error_code = e . response [ ' Error ' ] [ ' Code ' ]
if error_code == ' NotFound ' or error_code == ' NotFoundException ' :
logger . info ( f " ℹ ️ SNS Topic not found for { topic_arn } — falling back to SQS " )
return False
attempt + = 1
logger . warning ( f " Retry { attempt } / { MAX_RETRIES } SNS: { error_code } " )
if attempt == MAX_RETRIES :
raise
time . sleep ( exponential_backoff ( attempt ) )
return False
def send_to_sqs ( queue_url , message_body , msg_id ) :
""" Fallback: Direkter SQS-Send (wie bisher). """
attempt = 0
while attempt < MAX_RETRIES :
try :
sqs . send_message (
QueueUrl = queue_url ,
MessageBody = message_body
)
logger . info ( f " ✅ Sent { msg_id } to SQS: { queue_url } " )
return
except ClientError as e :
attempt + = 1
error_code = e . response [ ' Error ' ] [ ' Code ' ]
logger . warning ( f " Retry { attempt } / { MAX_RETRIES } SQS: { error_code } " )
if attempt == MAX_RETRIES :
raise
time . sleep ( exponential_backoff ( attempt ) )
def lambda_handler ( event , context ) :
"""
Nimmt SES Event entgegen, extrahiert Domain dynamisch,
verpackt Metadaten als ' Fake SNS ' und sendet an die domain-spezifische SQS .
Mit integrierter Retry-Logik für SQ S-Send .
Nimmt SES Event entgegen, extrahiert Domain dynamisch.
Strategie: SNS Publish (Fan-Out an Primary + Standby Queue) .
Fallback: Direkter SQS-Send falls kein SN S-Topic existiert .
"""
try :
records = event . get ( ' Records ' , [ ] )
@@ -51,13 +107,12 @@ def lambda_handler(event, context):
for record in records :
ses_data = record . get ( ' ses ' , { } )
if not ses_data :
logger . warning ( f " Invalid SES event: Missing ' ses ' in record: { record } " )
logger . warning ( f " Invalid SES event: Missing ' ses ' in record " )
continue
mail = ses_data . get ( ' mail ' , { } )
receipt = ses_data . get ( ' receipt ' , { } )
# Domain extrahieren (aus erstem Recipient)
recipients = receipt . get ( ' recipients ' , [ ] ) or mail . get ( ' destination ' , [ ] )
if not recipients :
logger . warning ( " No recipients in event - skipping " )
@@ -69,23 +124,19 @@ def lambda_handler(event, context):
logger . error ( " Could not extract domain from recipient " )
continue
# Wichtige Metadaten loggen
msg_id = mail . get ( ' messageId ' , ' unknown ' )
source = mail . get ( ' source ' , ' unknown ' )
logger . info ( f " Processing Message-ID: { msg_id } for domain: { domain } " )
logger . info ( f " From: { source } " )
logger . info ( f " To: { recipients } " )
logger . info ( f " From: { source } " )
logger . info ( f " To: { recipients } " )
# SES JSON als String serialisieren
ses_json_string = json . dumps ( ses_data )
# Payload Größe loggen und checken (Safeguard)
payload_size = len ( ses_json_string . encode ( ' utf-8 ' ) )
logger . info ( f " Metadata Payload Size: { payload_size } bytes " )
if payload_size > 200000 : # Arbitrary Limit < SQS 256KB
raise ValueError ( " Payload too large for SQS " )
logger . info ( f " Metadata Payload Size: { payload_size } bytes " )
if payload_size > 200000 :
raise ValueError ( " Payload too large " )
# Fake SNS Payload
fake_sns_payload = {
" Type " : " Notification " ,
" MessageId " : str ( uuid . uuid4 ( ) ) ,
@@ -95,26 +146,16 @@ def lambda_handler(event, context):
" Timestamp " : datetime . utcnow ( ) . isoformat ( ) + " Z "
}
# Queue URL dynamisch holen
queue_url = get_queue_url ( domain )
message_body = json . dumps ( fake_sns_payload )
# SQS Send mit Retries
attempt = 0
while attempt < MAX_RETRIES :
try :
sqs . send_message (
QueueUrl = queue_url ,
MessageBody = json . dumps ( fake_sns_payload )
)
logger . info ( f " ✅ Successfully forwarded { msg_id } to SQS: { queue_url } " )
break
except ClientError as e :
attempt + = 1
error_code = e . response [ ' Error ' ] [ ' Code ' ]
logger . warning ( f " Retry { attempt } / { MAX_RETRIES } for SQS send: { error_code } - { str ( e ) } " )
if attempt == MAX_RETRIES :
raise
time . sleep ( exponential_backoff ( attempt ) )
# Strategie: SNS zuerst, SQS als Fallback
topic_arn = get_topic_arn ( domain )
sns_success = publish_to_sns ( topic_arn , message_body , msg_id )
if not sns_success :
# Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher)
queue_url = get_queue_url ( domain )
send_to_sqs ( queue_url , message_body , msg_id )
return { ' status ' : ' ok ' }