update acc. to bounces
This commit is contained in:
164
test_extract_v2.py
Executable file
164
test_extract_v2.py
Executable file
@@ -0,0 +1,164 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script für Message-ID Extraktion - VERBESSERTE VERSION
|
||||
Kann lokal ausgeführt werden ohne AWS-Verbindung
|
||||
"""
|
||||
|
||||
import re
|
||||
from email.parser import BytesParser
|
||||
from email.policy import SMTP as SMTPPolicy
|
||||
|
||||
def log(message: str, level: str = 'INFO'):
|
||||
"""Dummy log für Tests"""
|
||||
print(f"[{level}] {message}")
|
||||
|
||||
def extract_original_message_id(parsed):
|
||||
"""
|
||||
Extrahiert Original SES Message-ID aus Email
|
||||
SES Format: 010f[hex32]-[hex8]-[hex4]-[hex4]-[hex4]-[hex12]-000000
|
||||
"""
|
||||
import re
|
||||
|
||||
# SES Message-ID Pattern (endet immer mit -000000)
|
||||
ses_pattern = re.compile(r'010f[0-9a-f]{12}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-000000')
|
||||
|
||||
# Die Message-ID der aktuellen Email (Bounce selbst) - diese wollen wir NICHT
|
||||
current_msg_id = (parsed.get('Message-ID') or '').strip()
|
||||
current_match = ses_pattern.search(current_msg_id)
|
||||
current_id = current_match.group(0) if current_match else None
|
||||
|
||||
log(f"Current Message-ID: {current_id}", 'DEBUG')
|
||||
|
||||
# 1. Versuche Standard-Header (In-Reply-To, References)
|
||||
for header in ['In-Reply-To', 'References']:
|
||||
value = (parsed.get(header) or '').strip()
|
||||
if value:
|
||||
match = ses_pattern.search(value)
|
||||
if match:
|
||||
found_id = match.group(0)
|
||||
# Nur nehmen wenn es NICHT die aktuelle Bounce-ID ist
|
||||
if found_id != current_id:
|
||||
log(f" Found Message-ID in {header}: {found_id}")
|
||||
return found_id
|
||||
|
||||
# 2. Durchsuche den kompletten Email-Body (inkl. ALLE Attachments/Parts)
|
||||
try:
|
||||
body_text = ''
|
||||
|
||||
# Hole den kompletten Body als String
|
||||
if parsed.is_multipart():
|
||||
for part in parsed.walk():
|
||||
content_type = part.get_content_type()
|
||||
|
||||
# SPEZIALFALL: message/rfc822 (eingebettete Messages)
|
||||
if content_type == 'message/rfc822':
|
||||
log(f" Processing embedded message/rfc822", 'DEBUG')
|
||||
try:
|
||||
# get_payload() gibt eine Liste mit einem EmailMessage-Objekt zurück!
|
||||
payload = part.get_payload()
|
||||
if isinstance(payload, list) and len(payload) > 0:
|
||||
embedded_msg = payload[0]
|
||||
# Hole Message-ID aus dem eingebetteten Message
|
||||
embedded_id = (embedded_msg.get('Message-ID') or '').strip()
|
||||
match = ses_pattern.search(embedded_id)
|
||||
if match:
|
||||
found_id = match.group(0)
|
||||
log(f" Found ID in embedded msg: {found_id}", 'DEBUG')
|
||||
# Nur nehmen wenn es NICHT die aktuelle Bounce-ID ist
|
||||
if found_id != current_id:
|
||||
log(f" ✓ Found Message-ID in embedded message: {found_id}")
|
||||
return found_id
|
||||
# Fallback: Konvertiere eingebettete Message zu String
|
||||
body_text += embedded_msg.as_string()
|
||||
except Exception as e:
|
||||
log(f" Warning: Could not process embedded message: {e}", 'WARNING')
|
||||
|
||||
# Durchsuche ALLE anderen Parts (außer Binärdaten wie images)
|
||||
elif content_type.startswith('text/') or content_type.startswith('application/'):
|
||||
try:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
# Versuche als UTF-8, fallback auf Latin-1
|
||||
try:
|
||||
body_text += payload.decode('utf-8', errors='ignore')
|
||||
except:
|
||||
try:
|
||||
body_text += payload.decode('latin-1', errors='ignore')
|
||||
except:
|
||||
# Letzter Versuch: als ASCII mit ignore
|
||||
body_text += str(payload, errors='ignore')
|
||||
except:
|
||||
# Falls decode fehlschlägt, String-Payload holen
|
||||
payload = part.get_payload()
|
||||
if isinstance(payload, str):
|
||||
body_text += payload
|
||||
else:
|
||||
# Nicht-Multipart Message
|
||||
payload = parsed.get_payload(decode=True)
|
||||
if payload:
|
||||
try:
|
||||
body_text = payload.decode('utf-8', errors='ignore')
|
||||
except:
|
||||
body_text = payload.decode('latin-1', errors='ignore')
|
||||
|
||||
# Suche alle SES Message-IDs im Body
|
||||
matches = ses_pattern.findall(body_text)
|
||||
if matches:
|
||||
log(f" Found {len(matches)} total IDs in body: {matches}", 'DEBUG')
|
||||
# Filtere die aktuelle Bounce-ID raus
|
||||
candidates = [m for m in matches if m != current_id]
|
||||
|
||||
if candidates:
|
||||
# Nehme die ERSTE der verbleibenden (meist die Original-ID)
|
||||
log(f" Found {len(matches)} SES Message-ID(s) in body, using first (not bounce): {candidates[0]}")
|
||||
return candidates[0]
|
||||
else:
|
||||
log(f" Found {len(matches)} SES Message-ID(s) but all match the bounce ID")
|
||||
|
||||
except Exception as e:
|
||||
log(f" Warning: Could not search body for Message-ID: {e}", 'WARNING')
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def test_with_file(filepath: str):
|
||||
"""Test mit einer echten Email-Datei"""
|
||||
print(f"\n{'='*70}")
|
||||
print(f"Testing: {filepath}")
|
||||
print('='*70)
|
||||
|
||||
with open(filepath, 'rb') as f:
|
||||
raw_bytes = f.read()
|
||||
|
||||
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
|
||||
|
||||
print(f"\nEmail Headers:")
|
||||
print(f" From: {parsed.get('From')}")
|
||||
print(f" To: {parsed.get('To')}")
|
||||
print(f" Subject: {parsed.get('Subject')}")
|
||||
print(f" Message-ID: {parsed.get('Message-ID')}")
|
||||
print(f" In-Reply-To: {parsed.get('In-Reply-To')}")
|
||||
print(f" References: {parsed.get('References')}")
|
||||
|
||||
print(f"\n--- EXTRACTION ---")
|
||||
result = extract_original_message_id(parsed)
|
||||
|
||||
print(f"\n{'='*70}")
|
||||
print(f"RESULT: {result}")
|
||||
print('='*70)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
# Email-Datei als Argument
|
||||
result = test_with_file(sys.argv[1])
|
||||
|
||||
# Exit code: 0 = success (ID found), 1 = failure (no ID)
|
||||
sys.exit(0 if result else 1)
|
||||
else:
|
||||
print("Usage: python3 test_extract_v2.py <email_file>")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user