aktueller Umbaustatus
This commit is contained in:
@@ -19,13 +19,48 @@ msg_table = dynamo.Table('ses-outbound-messages')
|
||||
PROCESSED_KEY = 'processed'
|
||||
PROCESSED_VALUE = 'true'
|
||||
|
||||
def is_ses_autoresponse(parsed):
|
||||
def is_ses_bounce_or_autoreply(parsed):
|
||||
"""Erkennt SES Bounces und Auto-Replies"""
|
||||
from_h = (parsed.get('From') or '').lower()
|
||||
auto_sub = (parsed.get('Auto-Submitted') or '').lower()
|
||||
return (
|
||||
'mailer-daemon@us-east-2.amazonses.com' in from_h
|
||||
and 'auto-replied' in auto_sub
|
||||
)
|
||||
|
||||
# SES MAILER-DAEMON oder Auto-Submitted Header
|
||||
is_mailer_daemon = 'mailer-daemon@' in from_h and 'amazonses.com' in from_h
|
||||
is_auto_replied = 'auto-replied' in auto_sub or 'auto-generated' in auto_sub
|
||||
|
||||
return is_mailer_daemon or is_auto_replied
|
||||
|
||||
|
||||
def extract_original_message_id(parsed):
|
||||
"""Extrahiert die ursprüngliche Message-ID aus In-Reply-To oder References"""
|
||||
# Versuche In-Reply-To
|
||||
in_reply_to = (parsed.get('In-Reply-To') or '').strip()
|
||||
if in_reply_to:
|
||||
msg_id = in_reply_to
|
||||
if msg_id.startswith('<') and '>' in msg_id:
|
||||
msg_id = msg_id[1:msg_id.find('>')]
|
||||
|
||||
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
|
||||
if '@' in msg_id:
|
||||
msg_id = msg_id.split('@')[0]
|
||||
|
||||
return msg_id
|
||||
|
||||
# Fallback: References Header (nimm die ERSTE ID)
|
||||
refs = (parsed.get('References') or '').strip()
|
||||
if refs:
|
||||
first_ref = refs.split()[0]
|
||||
if first_ref.startswith('<') and '>' in first_ref:
|
||||
first_ref = first_ref[1:first_ref.find('>')]
|
||||
|
||||
# ✅ WICHTIG: Entferne @amazonses.com Suffix falls vorhanden
|
||||
if '@' in first_ref:
|
||||
first_ref = first_ref.split('@')[0]
|
||||
|
||||
return first_ref
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def domain_to_bucket(domain: str) -> str:
|
||||
"""Konvertiert Domain zu S3 Bucket Namen"""
|
||||
@@ -159,7 +194,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||
'bucket': bucket,
|
||||
'key': key,
|
||||
'from': from_addr,
|
||||
'recipients': recipients, # Liste aller Empfänger
|
||||
'recipients': recipients,
|
||||
'domain': domain,
|
||||
'subject': subject,
|
||||
'message_id': message_id,
|
||||
@@ -194,7 +229,6 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||
print(f"✓ Queued to {queue_name}: SQS MessageId={sqs_message_id}")
|
||||
print(f" Recipients: {len(recipients)} - {', '.join(recipients)}")
|
||||
|
||||
# Als queued markieren
|
||||
mark_as_queued(bucket, key, queue_name)
|
||||
|
||||
return sqs_message_id
|
||||
@@ -206,8 +240,7 @@ def send_to_queue(queue_url: str, bucket: str, key: str,
|
||||
|
||||
def lambda_handler(event, context):
|
||||
"""
|
||||
Lambda Handler für SES Events
|
||||
Eine Domain pro Event = eine Queue Message mit allen Recipients
|
||||
Lambda Handler für SES Inbound Events
|
||||
"""
|
||||
|
||||
print(f"{'='*70}")
|
||||
@@ -221,10 +254,7 @@ def lambda_handler(event, context):
|
||||
ses = record['ses']
|
||||
except (KeyError, IndexError) as e:
|
||||
print(f"✗ Invalid event structure: {e}")
|
||||
return {
|
||||
'statusCode': 400,
|
||||
'body': json.dumps({'error': 'Invalid SES event'})
|
||||
}
|
||||
return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid SES event'})}
|
||||
|
||||
mail = ses['mail']
|
||||
receipt = ses['receipt']
|
||||
@@ -234,21 +264,14 @@ def lambda_handler(event, context):
|
||||
timestamp = mail.get('timestamp', '')
|
||||
recipients = receipt.get('recipients', [])
|
||||
|
||||
# FRÜHES LOGGING: S3 Key und Recipients
|
||||
print(f"\n🔑 S3 Key: {message_id}")
|
||||
print(f"👥 Recipients ({len(recipients)}): {', '.join(recipients)}")
|
||||
|
||||
if not recipients:
|
||||
print(f"✗ No recipients found in event")
|
||||
return {
|
||||
'statusCode': 400,
|
||||
'body': json.dumps({
|
||||
'error': 'No recipients in event',
|
||||
'message_id': message_id
|
||||
})
|
||||
}
|
||||
return {'statusCode': 400, 'body': json.dumps({'error': 'No recipients'})}
|
||||
|
||||
# Domain extrahieren (alle Recipients haben gleiche Domain!)
|
||||
# Domain extrahieren
|
||||
domain = recipients[0].split('@')[1].lower()
|
||||
bucket = domain_to_bucket(domain)
|
||||
|
||||
@@ -257,187 +280,129 @@ def lambda_handler(event, context):
|
||||
print(f" From: {source}")
|
||||
print(f" Domain: {domain}")
|
||||
print(f" Bucket: {bucket}")
|
||||
print(f" Timestamp: {timestamp}")
|
||||
print(f" Recipients: {len(recipients)}")
|
||||
|
||||
# Queue für Domain ermitteln
|
||||
# Queue ermitteln
|
||||
try:
|
||||
queue_url = get_queue_url_for_domain(domain)
|
||||
queue_name = queue_url.split('/')[-1]
|
||||
print(f" Queue: {queue_name}")
|
||||
except Exception as e:
|
||||
print(f"\n✗ ERROR: {e}")
|
||||
return {
|
||||
'statusCode': 500,
|
||||
'body': json.dumps({
|
||||
'error': 'queue_not_configured',
|
||||
'domain': domain,
|
||||
'recipients': recipients,
|
||||
'message': str(e)
|
||||
})
|
||||
}
|
||||
print(f"\n✗ Queue ERROR: {e}")
|
||||
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}
|
||||
|
||||
# S3 Object finden
|
||||
try:
|
||||
print(f"\n📦 Searching S3...")
|
||||
response = s3.list_objects_v2(
|
||||
Bucket=bucket,
|
||||
Prefix=message_id,
|
||||
MaxKeys=1
|
||||
)
|
||||
response = s3.list_objects_v2(Bucket=bucket, Prefix=message_id, MaxKeys=1)
|
||||
|
||||
if 'Contents' not in response or not response['Contents']:
|
||||
raise Exception(f"No S3 object found for message {message_id}")
|
||||
if 'Contents' not in response:
|
||||
raise Exception(f"No S3 object found for {message_id}")
|
||||
|
||||
key = response['Contents'][0]['Key']
|
||||
size = response['Contents'][0]['Size']
|
||||
print(f" Found: s3://{bucket}/{key}")
|
||||
print(f" Size: {size:,} bytes ({size/1024:.1f} KB)")
|
||||
print(f" Found: s3://{bucket}/{key} ({size/1024:.1f} KB)")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n✗ S3 ERROR: {e}")
|
||||
return {
|
||||
'statusCode': 404,
|
||||
'body': json.dumps({
|
||||
'error': 's3_object_not_found',
|
||||
'message_id': message_id,
|
||||
'bucket': bucket,
|
||||
'details': str(e)
|
||||
})
|
||||
}
|
||||
return {'statusCode': 404, 'body': json.dumps({'error': str(e)})}
|
||||
|
||||
# Duplicate Check
|
||||
print(f"\n🔍 Checking for duplicates...")
|
||||
if is_already_processed(bucket, key):
|
||||
print(f" Already processed, skipping")
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({
|
||||
'status': 'already_processed',
|
||||
'message_id': message_id,
|
||||
'recipients': recipients
|
||||
})
|
||||
}
|
||||
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processed'})}
|
||||
|
||||
# Processing Lock setzen
|
||||
print(f"\n🔒 Setting processing lock...")
|
||||
# Processing Lock
|
||||
if not set_processing_lock(bucket, key):
|
||||
print(f" Already being processed by another instance")
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps({
|
||||
'status': 'already_processing',
|
||||
'message_id': message_id,
|
||||
'recipients': recipients
|
||||
})
|
||||
}
|
||||
return {'statusCode': 200, 'body': json.dumps({'status': 'already_processing'})}
|
||||
|
||||
# E-Mail laden um Subject zu extrahieren
|
||||
# E-Mail laden und ggf. umschreiben
|
||||
subject = '(unknown)'
|
||||
raw_bytes = b''
|
||||
parsed = None
|
||||
modified = False
|
||||
|
||||
|
||||
try:
|
||||
print(f"\n📖 Reading email for metadata...")
|
||||
print(f"\n📖 Reading email...")
|
||||
obj = s3.get_object(Bucket=bucket, Key=key)
|
||||
raw_bytes = obj['Body'].read()
|
||||
metadata = obj.get('Metadata', {}) or {}
|
||||
|
||||
# Header parsen
|
||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
||||
subject = parsed.get('subject', '(no subject)')
|
||||
print(f" Subject: {subject}")
|
||||
|
||||
# 🔁 SES Auto-Response erkennen
|
||||
if is_ses_autoresponse(parsed):
|
||||
print(" Detected SES auto-response (out-of-office)")
|
||||
|
||||
# Message-ID der ursprünglichen Mail aus In-Reply-To / References holen
|
||||
in_reply_to = (parsed.get('In-Reply-To') or '').strip()
|
||||
if not in_reply_to:
|
||||
refs = (parsed.get('References') or '').strip()
|
||||
# nimm die erste ID aus References
|
||||
in_reply_to = refs.split()[0] if refs else ''
|
||||
|
||||
lookup_id = ''
|
||||
if in_reply_to.startswith('<') and '>' in in_reply_to:
|
||||
lookup_id = in_reply_to[1:in_reply_to.find('>')]
|
||||
else:
|
||||
lookup_id = in_reply_to
|
||||
|
||||
original = None
|
||||
if lookup_id:
|
||||
try:
|
||||
res = msg_table.get_item(Key={'MessageId': lookup_id})
|
||||
original = res.get('Item')
|
||||
print(f" Dynamo lookup for {lookup_id}: {'hit' if original else 'miss'}")
|
||||
except Exception as e:
|
||||
print(f"⚠ Dynamo lookup failed: {e}")
|
||||
|
||||
if original:
|
||||
orig_from = original.get('source', '')
|
||||
destinations = original.get('destinations', []) or []
|
||||
# einfache Variante: nimm den ersten Empfänger
|
||||
orig_to = destinations[0] if destinations else ''
|
||||
|
||||
# Domain hast du oben bereits aus recipients[0] extrahiert
|
||||
display = f"Out of Office from {orig_to}" if orig_to else "Out of Office"
|
||||
|
||||
# ursprüngliche Infos sichern
|
||||
parsed['X-SES-Original-From'] = parsed.get('From', '')
|
||||
parsed['X-SES-Original-Recipient'] = orig_to
|
||||
|
||||
# From für den User "freundlich" machen
|
||||
parsed.replace_header('From', f'"{display}" <no-reply@{domain}>')
|
||||
|
||||
# Antworten trotzdem an den Absender deiner ursprünglichen Mail
|
||||
if orig_from:
|
||||
parsed['Reply-To'] = orig_from
|
||||
|
||||
subj = parsed.get('Subject', 'out of office')
|
||||
if not subj.lower().startswith('out of office'):
|
||||
parsed.replace_header('Subject', f"Out of office: {subj}")
|
||||
|
||||
# geänderte Mail zurück in Bytes
|
||||
raw_bytes = parsed.as_bytes()
|
||||
modified = True
|
||||
print(" Auto-response rewritten for delivery to user inbox")
|
||||
else:
|
||||
print(" No original send record found for auto-response")
|
||||
|
||||
# Wenn wir die Mail verändert haben, aktualisieren wir das S3-Objekt
|
||||
if modified:
|
||||
s3.put_object(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
Body=raw_bytes,
|
||||
Metadata=metadata
|
||||
)
|
||||
print(" Updated S3 object with rewritten auto-response")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ⚠ Could not parse email (continuing): {e}")
|
||||
|
||||
|
||||
# In Queue einreihen (EINE Message mit ALLEN Recipients)
|
||||
try:
|
||||
print(f"\n📤 Queuing to {queue_name}...")
|
||||
|
||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
||||
subject = parsed.get('Subject', '(no subject)')
|
||||
print(f" Subject: {subject}")
|
||||
|
||||
# 🔁 Auto-Response / Bounce Detection
|
||||
if is_ses_bounce_or_autoreply(parsed):
|
||||
print(f" 🔍 Detected auto-response/bounce from SES")
|
||||
|
||||
# Extrahiere ursprüngliche Message-ID
|
||||
original_msg_id = extract_original_message_id(parsed)
|
||||
|
||||
if original_msg_id:
|
||||
print(f" 📋 Original MessageId: {original_msg_id}")
|
||||
|
||||
try:
|
||||
# Hole Original-Send aus DynamoDB
|
||||
result = msg_table.get_item(Key={'MessageId': original_msg_id})
|
||||
original_send = result.get('Item')
|
||||
|
||||
if original_send:
|
||||
orig_source = original_send.get('source', '')
|
||||
orig_destinations = original_send.get('destinations', [])
|
||||
|
||||
print(f" ✓ Found original send:")
|
||||
print(f" Original From: {orig_source}")
|
||||
print(f" Original To: {orig_destinations}")
|
||||
|
||||
# **WICHTIG**: Der erste Empfänger war der eigentliche Empfänger
|
||||
original_recipient = orig_destinations[0] if orig_destinations else ''
|
||||
|
||||
if original_recipient:
|
||||
# Absender umschreiben auf ursprünglichen Empfänger
|
||||
original_from = parsed.get('From', '')
|
||||
parsed['X-Original-SES-From'] = original_from
|
||||
parsed['X-Original-MessageId'] = original_msg_id
|
||||
|
||||
# **From auf den ursprünglichen Empfänger setzen**
|
||||
parsed.replace_header('From', original_recipient)
|
||||
|
||||
# Reply-To optional beibehalten
|
||||
if not parsed.get('Reply-To'):
|
||||
parsed['Reply-To'] = original_recipient
|
||||
|
||||
# Subject anpassen falls nötig
|
||||
if 'delivery status notification' in subject.lower():
|
||||
parsed.replace_header('Subject', f"Delivery Status: {orig_destinations[0]}")
|
||||
|
||||
raw_bytes = parsed.as_bytes()
|
||||
modified = True
|
||||
|
||||
print(f" ✅ Rewritten: From={original_recipient}")
|
||||
else:
|
||||
print(f" ⚠ No DynamoDB record found for {original_msg_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ⚠ DynamoDB lookup failed: {e}")
|
||||
else:
|
||||
print(f" ⚠ Could not extract original Message-ID")
|
||||
|
||||
# S3 aktualisieren falls modified
|
||||
if modified:
|
||||
s3.put_object(Bucket=bucket, Key=key, Body=raw_bytes, Metadata=metadata)
|
||||
print(f" 💾 Updated S3 object with rewritten email")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ⚠ Email parsing error: {e}")
|
||||
|
||||
# In Queue einreihen
|
||||
try:
|
||||
sqs_message_id = send_to_queue(
|
||||
queue_url=queue_url,
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
from_addr=source,
|
||||
recipients=recipients, # ALLE Recipients
|
||||
recipients=recipients,
|
||||
domain=domain,
|
||||
subject=subject,
|
||||
message_id=message_id
|
||||
)
|
||||
|
||||
print(f"\n{'='*70}")
|
||||
print(f"✅ SUCCESS - Email queued for delivery")
|
||||
print(f"{'='*70}\n")
|
||||
print(f"\n✅ SUCCESS - Queued for delivery\n")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
@@ -445,26 +410,10 @@ def lambda_handler(event, context):
|
||||
'status': 'queued',
|
||||
'message_id': message_id,
|
||||
'sqs_message_id': sqs_message_id,
|
||||
'queue': queue_name,
|
||||
'domain': domain,
|
||||
'recipients': recipients,
|
||||
'recipient_count': len(recipients),
|
||||
'subject': subject
|
||||
'modified': modified
|
||||
})
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n{'='*70}")
|
||||
print(f"✗ FAILED TO QUEUE")
|
||||
print(f"{'='*70}")
|
||||
print(f"Error: {e}")
|
||||
|
||||
return {
|
||||
'statusCode': 500,
|
||||
'body': json.dumps({
|
||||
'error': 'failed_to_queue',
|
||||
'message': str(e),
|
||||
'message_id': message_id,
|
||||
'recipients': recipients
|
||||
})
|
||||
}
|
||||
print(f"\n✗ QUEUE FAILED: {e}")
|
||||
return {'statusCode': 500, 'body': json.dumps({'error': str(e)})}
|
||||
Reference in New Issue
Block a user