cleanwork

This commit is contained in:
2026-01-12 17:19:44 -06:00
parent 87e00ae867
commit be9c5b4ceb
17 changed files with 0 additions and 324 deletions

View File

@@ -0,0 +1,26 @@
FROM python:3.11-slim
# Metadata
LABEL maintainer="your-email@example.com"
LABEL description="Domain-specific email worker for SMTP delivery"
# Non-root user für Security
RUN useradd -m -u 1000 worker && \
mkdir -p /app && \
chown -R worker:worker /app
# Boto3 installieren
RUN pip install --no-cache-dir boto3
# Worker Code
COPY --chown=worker:worker worker.py /app/worker.py
WORKDIR /app
USER worker
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
CMD pgrep -f worker.py || exit 1
# Start worker mit unbuffered output
CMD ["python", "-u", "worker.py"]

View File

@@ -0,0 +1,53 @@
services:
worker:
image: python:3.11-slim
container_name: email-worker-${WORKER_DOMAIN}
restart: unless-stopped
network_mode: host # Zugriff auf lokales Netzwerk für Postfix
# Worker-Code mounten
volumes:
- ./worker.py:/app/worker.py:ro
working_dir: /app
# Python Dependencies installieren und Worker starten
command: >
sh -c "apt-get update &&
apt-get install -y --no-install-recommends procps &&
rm -rf /var/lib/apt/lists/* &&
pip install --no-cache-dir boto3 &&
python -u worker.py"
environment:
# ⚠️ WICHTIG: WORKER_DOMAIN muss von außen gesetzt werden!
- WORKER_DOMAIN=${WORKER_DOMAIN}
# AWS Credentials
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
# Worker Settings
- POLL_INTERVAL=${POLL_INTERVAL:-20}
- MAX_MESSAGES=${MAX_MESSAGES:-10}
- VISIBILITY_TIMEOUT=${VISIBILITY_TIMEOUT:-300}
# SMTP Configuration
- SMTP_HOST=${SMTP_HOST:-localhost}
- SMTP_PORT=${SMTP_PORT:-25}
- SMTP_USE_TLS=${SMTP_USE_TLS:-false}
- SMTP_USER=${SMTP_USER:-}
- SMTP_PASS=${SMTP_PASS:-}
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
healthcheck:
test: ["CMD", "pgrep", "-f", "worker.py"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s

View File

@@ -0,0 +1,30 @@
#!/bin/bash
# manage-worker.sh
DOMAIN=$1
if [ -z "$DOMAIN" ]; then
echo "Usage: $0 <domain> [action]"
echo "Example: $0 andreasknuth.de"
echo " $0 andreasknuth.de down"
echo " $0 andreasknuth.de logs -f"
exit 1
fi
# Entfernt den ersten Parameter ($1 / DOMAIN) aus der Argumentenliste
shift
# Nimm ALLE verbleibenden Argumente ($@). Wenn keine da sind, nimm "up -d".
ACTION="${@:-up -d}"
PROJECT_NAME="${DOMAIN//./-}"
ENV_FILE=".env.${DOMAIN}"
if [ ! -f "$ENV_FILE" ]; then
echo "Error: $ENV_FILE not found!"
exit 1
fi
# $ACTION wird hier nicht in Anführungszeichen gesetzt,
# damit "logs -f" als zwei separate Befehle erkannt wird.
docker compose -p "$PROJECT_NAME" --env-file "$ENV_FILE" $ACTION

View File

@@ -0,0 +1,19 @@
#!/bin/bash
# update-all-workers.sh (smart version)
DOMAINS=$(docker ps --filter "name=email-worker-" --format "{{.Names}}" | sed 's/email-worker-//')
if [ -z "$DOMAINS" ]; then
echo "No workers found"
exit 1
fi
echo "Found workers: $DOMAINS"
echo ""
for domain in $DOMAINS; do
echo "═══ $domain ═══"
./manage-worker.sh "$domain" restart
done
echo "✓ Done"

885
deprecated_worker/worker.py Executable file
View File

@@ -0,0 +1,885 @@
import os
import sys
import boto3
import smtplib
import json
import time
import traceback
import signal
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
from botocore.exceptions import ClientError # Neu: Korrekter Import für SES-Exceptions
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
ses = boto3.client('ses', region_name=AWS_REGION) # Neu: Für OOO/Forwards
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Graceful shutdown
shutdown_requested = False
# DynamoDB Ressource für Bounce-Lookup
# DynamoDB Ressource für Bounce-Lookup und Rules
try:
dynamo = boto3.resource('dynamodb', region_name=AWS_REGION)
msg_table = dynamo.Table('ses-outbound-messages')
rules_table = dynamo.Table('email-rules') # Neu: Für OOO/Forwards
except Exception as e:
log(f"Warning: Could not connect to DynamoDB: {e}", 'WARNING')
msg_table = None
rules_table = None
def get_bucket_name(domain):
"""Konvention: domain.tld -> domain-tld-emails"""
return domain.replace('.', '-') + '-emails'
def is_ses_bounce_notification(parsed):
"""
Prüft ob Email von SES MAILER-DAEMON ist
"""
from_h = (parsed.get('From') or '').lower()
return 'mailer-daemon@us-east-2.amazonses.com' in from_h
def get_bounce_info_from_dynamodb(message_id, max_retries=3, retry_delay=1):
"""
Sucht Bounce-Info in DynamoDB anhand der Message-ID
Mit Retry-Logik für Timing-Issues
Returns: dict mit bounce info oder None
"""
import time
for attempt in range(max_retries):
try:
response = msg_table.get_item(Key={'MessageId': message_id})
item = response.get('Item')
if item:
# Gefunden!
return {
'original_source': item.get('original_source', ''),
'bounceType': item.get('bounceType', 'Unknown'),
'bounceSubType': item.get('bounceSubType', 'Unknown'),
'bouncedRecipients': item.get('bouncedRecipients', []),
'timestamp': item.get('timestamp', '')
}
# Nicht gefunden - Retry falls nicht letzter Versuch
if attempt < max_retries - 1:
log(f" Bounce record not found yet, retrying in {retry_delay}s (attempt {attempt + 1}/{max_retries})...")
time.sleep(retry_delay)
else:
log(f"⚠ No bounce record found after {max_retries} attempts for Message-ID: {message_id}")
return None
except Exception as e:
log(f"⚠ DynamoDB Error (attempt {attempt + 1}/{max_retries}): {e}", 'ERROR')
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
return None
return None
def apply_bounce_logic(parsed, subject):
"""
Prüft auf SES Bounce, sucht in DynamoDB und schreibt Header um.
Returns: (parsed_email_object, was_modified_bool)
"""
if not is_ses_bounce_notification(parsed):
return parsed, False
log("🔍 Detected SES MAILER-DAEMON bounce notification")
# Message-ID aus Header extrahieren
message_id = (parsed.get('Message-ID') or '').strip('<>').split('@')[0]
if not message_id:
log("⚠ Could not extract Message-ID from bounce notification")
return parsed, False
log(f" Looking up Message-ID: {message_id}")
# Lookup in DynamoDB
bounce_info = get_bounce_info_from_dynamodb(message_id)
if not bounce_info:
return parsed, False
# Bounce Info ausgeben
original_source = bounce_info['original_source']
bounced_recipients = bounce_info['bouncedRecipients']
bounce_type = bounce_info['bounceType']
bounce_subtype = bounce_info['bounceSubType']
log(f"✓ Found bounce info:")
log(f" Original sender: {original_source}")
log(f" Bounce type: {bounce_type}/{bounce_subtype}")
log(f" Bounced recipients: {bounced_recipients}")
# Nehme den ersten bounced recipient als neuen Absender
# (bei Multiple Recipients kann es mehrere geben)
if bounced_recipients:
new_from = bounced_recipients[0]
# Rewrite Headers
parsed['X-Original-SES-From'] = parsed.get('From', '')
parsed['X-Bounce-Type'] = f"{bounce_type}/{bounce_subtype}"
parsed.replace_header('From', new_from)
if not parsed.get('Reply-To'):
parsed['Reply-To'] = new_from
# Subject anpassen
if 'delivery status notification' in subject.lower() or 'thanks for your submission' in subject.lower():
parsed.replace_header('Subject', f"Delivery Status: {new_from}")
log(f"✓ Rewritten FROM: {new_from}")
return parsed, True
log("⚠ No bounced recipients found in bounce info")
return parsed, False
def signal_handler(signum, frame):
global shutdown_requested
print(f"\n⚠ Shutdown signal received (signal {signum})")
shutdown_requested = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def log(message: str, level: str = 'INFO'):
"""Structured logging with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None):
"""
Markiert E-Mail als erfolgreich zugestellt
Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
# Invalid inboxes speichern falls vorhanden
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING')
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list):
"""
Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING')
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
"""
Markiert E-Mail als komplett fehlgeschlagen
Wird nur aufgerufen wenn ALLE Recipients fehlschlagen
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['status'] = 'failed'
metadata['failed_at'] = str(int(time.time()))
metadata['failed_by'] = WORKER_NAME
metadata['error'] = error[:500] # S3 Metadata limit
metadata['retry_count'] = str(receive_count)
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
except Exception as e:
log(f"Failed to mark as failed: {e}", 'WARNING')
def is_temporary_smtp_error(error_msg: str) -> bool:
"""
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
4xx Codes = temporär, 5xx = permanent
"""
temporary_indicators = [
'421', # Service not available
'450', # Mailbox unavailable
'451', # Local error
'452', # Insufficient storage
'4', # Generisch 4xx
'timeout',
'connection refused',
'connection reset',
'network unreachable',
'temporarily',
'try again'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in temporary_indicators)
def is_permanent_recipient_error(error_msg: str) -> bool:
"""
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid
"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
"""
Sendet E-Mail via SMTP an EINEN Empfänger
Returns: (success: bool, error: str or None, is_permanent: bool)
"""
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
smtp.ehlo()
# STARTTLS falls konfiguriert
if SMTP_USE_TLS:
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
log(f" STARTTLS failed: {e}", 'WARNING')
# Authentication falls konfiguriert
if SMTP_USER and SMTP_PASS:
try:
smtp.login(SMTP_USER, SMTP_PASS)
except Exception as e:
log(f" SMTP auth failed: {e}", 'WARNING')
# E-Mail senden
result = smtp.sendmail(from_addr, [recipient], raw_message)
# Result auswerten
if isinstance(result, dict) and result:
# Empfänger wurde abgelehnt
error = result.get(recipient, 'Unknown refusal')
is_permanent = is_permanent_recipient_error(str(error))
log(f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR')
return False, str(error), is_permanent
else:
# Erfolgreich
log(f"{recipient}: Delivered", 'SUCCESS')
return True, None, False
except smtplib.SMTPException as e:
error_msg = str(e)
is_permanent = is_permanent_recipient_error(error_msg)
log(f"{recipient}: SMTP error - {error_msg}", 'ERROR')
return False, error_msg, is_permanent
except Exception as e:
# Connection errors sind immer temporär
log(f"{recipient}: Connection error - {e}", 'ERROR')
return False, str(e), False
def extract_body_parts(parsed):
"""
Extrahiert sowohl text/plain als auch text/html Body-Parts.
Returns: (text_body: str, html_body: str or None)
"""
text_body = ''
html_body = None
if parsed.is_multipart():
for part in parsed.walk():
content_type = part.get_content_type()
if content_type == 'text/plain':
try:
text_body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception as e:
log(f"⚠ Error decoding text/plain part: {e}", 'WARNING')
elif content_type == 'text/html':
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
except Exception as e:
log(f"⚠ Error decoding text/html part: {e}", 'WARNING')
else:
try:
payload = parsed.get_payload(decode=True)
if payload:
decoded = payload.decode('utf-8', errors='ignore')
if parsed.get_content_type() == 'text/html':
html_body = decoded
else:
text_body = decoded
except Exception as e:
log(f"⚠ Error decoding non-multipart body: {e}", 'WARNING')
text_body = str(parsed.get_payload())
return text_body.strip() if text_body else '(No body content)', html_body
def create_ooo_reply(original_parsed, recipient, ooo_msg, content_type='text'):
"""
Erstellt eine Out-of-Office Reply als komplette MIME-Message.
Behält Original-Body (text + html) bei.
"""
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
text_body, html_body = extract_body_parts(original_parsed)
original_subject = original_parsed.get('Subject', '(no subject)')
original_from = original_parsed.get('From', 'unknown')
# Neue Message erstellen
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = original_from
msg['Subject'] = f"Out of Office: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['In-Reply-To'] = original_parsed.get('Message-ID', '')
msg['References'] = original_parsed.get('Message-ID', '')
msg['Auto-Submitted'] = 'auto-replied' # Verhindert Loops
# Body-Teil erstellen
body_part = MIMEMultipart('alternative')
# Text-Version
text_content = f"{ooo_msg}\n\n"
text_content += "--- Original Message ---\n"
text_content += f"From: {original_from}\n"
text_content += f"Subject: {original_subject}\n\n"
text_content += text_body
body_part.attach(MIMEText(text_content, 'plain', 'utf-8'))
# HTML-Version (wenn gewünscht und Original vorhanden)
if content_type == 'html' or html_body:
html_content = f"<div>{ooo_msg}</div><br><hr><br>"
html_content += "<blockquote style='margin:10px 0;padding:10px;border-left:3px solid #ccc;'>"
html_content += f"<strong>Original Message</strong><br>"
html_content += f"<strong>From:</strong> {original_from}<br>"
html_content += f"<strong>Subject:</strong> {original_subject}<br><br>"
html_content += (html_body if html_body else text_body.replace('\n', '<br>'))
html_content += "</blockquote>"
body_part.attach(MIMEText(html_content, 'html', 'utf-8'))
msg.attach(body_part)
return msg
def create_forward_message(original_parsed, recipient, forward_to, original_from):
"""
Erstellt eine Forward-Message als komplette MIME-Message.
Behält ALLE Original-Parts inkl. Attachments bei.
"""
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
original_subject = original_parsed.get('Subject', '(no subject)')
original_date = original_parsed.get('Date', 'unknown')
# Neue Message erstellen
msg = MIMEMultipart('mixed')
msg['From'] = recipient
msg['To'] = forward_to
msg['Subject'] = f"FWD: {original_subject}"
msg['Date'] = formatdate(localtime=True)
msg['Message-ID'] = make_msgid(domain=recipient.split('@')[1])
msg['Reply-To'] = original_from
# Forward-Header als Text
text_body, html_body = extract_body_parts(original_parsed)
# Body-Teil
body_part = MIMEMultipart('alternative')
# Text-Version
fwd_text = "---------- Forwarded message ---------\n"
fwd_text += f"From: {original_from}\n"
fwd_text += f"Date: {original_date}\n"
fwd_text += f"Subject: {original_subject}\n"
fwd_text += f"To: {recipient}\n\n"
fwd_text += text_body
body_part.attach(MIMEText(fwd_text, 'plain', 'utf-8'))
# HTML-Version
if html_body:
fwd_html = "<div style='border-left:3px solid #ccc;padding-left:10px;margin:10px 0;'>"
fwd_html += "<strong>---------- Forwarded message ---------</strong><br>"
fwd_html += f"<strong>From:</strong> {original_from}<br>"
fwd_html += f"<strong>Date:</strong> {original_date}<br>"
fwd_html += f"<strong>Subject:</strong> {original_subject}<br>"
fwd_html += f"<strong>To:</strong> {recipient}<br><br>"
fwd_html += html_body
fwd_html += "</div>"
body_part.attach(MIMEText(fwd_html, 'html', 'utf-8'))
msg.attach(body_part)
# WICHTIG: Attachments kopieren
if original_parsed.is_multipart():
for part in original_parsed.walk():
# Nur non-body parts (Attachments)
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_type() in ['text/plain', 'text/html']:
continue # Body bereits oben behandelt
# Attachment hinzufügen
msg.attach(part)
return msg
# ==========================================
# HAUPTFUNKTION: PROCESS MESSAGE
# ==========================================
def process_message(message_body: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue (SNS-wrapped SES Notification)
Returns: True (Erfolg/Löschen), False (Retry/Behalten)
"""
try:
# 1. UNPACKING (SNS -> SES)
# SQS Body ist JSON. Darin ist meist 'Type': 'Notification' und 'Message': '...JSONString...'
if 'Message' in message_body and 'Type' in message_body:
# Es ist eine SNS Notification
sns_content = message_body['Message']
if isinstance(sns_content, str):
ses_msg = json.loads(sns_content)
else:
ses_msg = sns_content
else:
# Fallback: Vielleicht doch direkt SES (Legacy support)
ses_msg = message_body
# 2. DATEN EXTRAHIEREN
mail = ses_msg.get('mail', {})
receipt = ses_msg.get('receipt', {})
message_id = mail.get('messageId') # Das ist der S3 Key!
# FIX: Amazon SES Setup Notification ignorieren
if message_id == "AMAZON_SES_SETUP_NOTIFICATION":
log(" Received Amazon SES Setup Notification. Ignoring.", 'INFO')
return True # Erfolgreich (löschen), da kein Fehler
from_addr = mail.get('source')
recipients = receipt.get('recipients', [])
# S3 Key Validation
if not message_id:
log("❌ Error: No messageId in event payload", 'ERROR')
return True # Löschen, da unbrauchbar
# Domain Validation
# Wir nehmen den ersten Empfänger um die Domain zu prüfen
if recipients:
first_recipient = recipients[0]
domain = first_recipient.split('@')[1]
if domain.lower() != WORKER_DOMAIN.lower():
log(f"⚠ Security: Ignored message for {domain} (I am worker for {WORKER_DOMAIN})", 'WARNING')
return True # Löschen, gehört nicht hierher
else:
log("⚠ Warning: No recipients in event", 'WARNING')
return True
# Bucket Name ableiten
bucket = get_bucket_name(WORKER_DOMAIN)
key = message_id
log(f"\n{'='*70}")
log(f"Processing Email (SNS/SES):")
log(f" ID: {key}")
log(f" Recipients: {len(recipients)} -> {recipients}")
log(f" Bucket: {bucket}")
# 3. LADEN AUS S3
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
log(f"✓ Loaded {len(raw_bytes)} bytes from S3")
except s3.exceptions.NoSuchKey:
# Race Condition: SNS war schneller als S3.
# Wir geben False zurück, damit SQS es in 30s nochmal versucht.
if receive_count < 5:
log(f"⏳ S3 Object not found yet (Attempt {receive_count}). Retrying...", 'WARNING')
return False
else:
log(f"❌ S3 Object missing permanently after retries.", 'ERROR')
return True # Löschen
except Exception as e:
log(f"❌ S3 Download Error: {e}", 'ERROR')
return False # Retry
# 4. PARSING & BOUNCE LOGIC
try:
parsed = BytesParser(policy=SMTPPolicy).parsebytes(raw_bytes)
subject = parsed.get('Subject', '(no subject)')
# Hier passiert die Magie: Bounce Header umschreiben
parsed, modified = apply_bounce_logic(parsed, subject)
if modified:
log(" ✨ Bounce detected & headers rewritten via DynamoDB")
# Wir arbeiten mit den modifizierten Bytes weiter
raw_bytes = parsed.as_bytes()
from_addr_final = parsed.get('From') # Neuer Absender für SMTP Envelope
else:
from_addr_final = from_addr # Original Envelope Sender
except Exception as e:
log(f"⚠ Parsing/Logic Error: {e}. Sending original.", 'WARNING')
from_addr_final = from_addr
# 5. OOO & FORWARD LOGIC (neu, vor SMTP-Versand)
if rules_table and not is_ses_bounce_notification(parsed):
for recipient in recipients:
try:
rule = rules_table.get_item(Key={'email_address': recipient}).get('Item', {})
# OOO handling
if rule.get('ooo_active', False):
ooo_msg = rule.get('ooo_message', 'Default OOO message.')
content_type = rule.get('ooo_content_type', 'text')
sender = parsed.get('From')
try:
# Erstelle komplette MIME-Message
ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type)
# Sende via send_raw_email (unterstützt komplexe MIME)
ses.send_raw_email(
Source=recipient,
Destinations=[sender],
RawMessage={'Data': ooo_reply.as_bytes()}
)
log(f"✓ Sent OOO reply to {sender} from {recipient}")
except ClientError as e:
error_code = e.response['Error']['Code']
log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR')
# Forward handling
forwards = rule.get('forwards', [])
if forwards:
original_from = parsed.get('From')
for forward_to in forwards:
try:
# Erstelle komplette Forward-Message mit Attachments
fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from)
# Sende via send_raw_email
ses.send_raw_email(
Source=recipient,
Destinations=[forward_to],
RawMessage={'Data': fwd_msg.as_bytes()}
)
log(f"✓ Forwarded to {forward_to} from {recipient} (original: {original_from})")
except ClientError as e:
error_code = e.response['Error']['Code']
log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR')
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'MessageRejected':
log(f"⚠ SES rejected send for {recipient}: Check verification/quotas.", 'ERROR')
elif error_code == 'AccessDenied':
log(f"⚠ SES AccessDenied for {recipient}: Check IAM policy.", 'ERROR')
else:
log(f"⚠ SES error for {recipient}: {e}", 'ERROR')
except Exception as e:
log(f"⚠ Rule processing error for {recipient}: {e}", 'WARNING')
traceback.print_exc()
# 6. SMTP VERSAND (Loop über Recipients)
log(f"📤 Sending to {len(recipients)} recipient(s)...")
successful = []
failed_permanent = []
failed_temporary = []
for recipient in recipients:
# Wir nutzen raw_bytes (ggf. modifiziert)
# WICHTIG: Als Envelope Sender nutzen wir 'from_addr_final'
# (bei Bounces ist das der Original-Empfänger, sonst der SES Sender)
success, error, is_perm = send_email(from_addr_final, recipient, raw_bytes)
if success:
successful.append(recipient)
elif is_perm:
failed_permanent.append(recipient)
else:
failed_temporary.append(recipient)
# 6. RESULTAT & CLEANUP
log(f"📊 Results: {len(successful)} OK, {len(failed_temporary)} TempFail, {len(failed_permanent)} PermFail")
if len(successful) > 0:
# Mindestens einer durchgegangen -> Erfolg
mark_as_processed(bucket, key, failed_permanent if failed_permanent else None)
log(f"✅ Success. Deleted from queue.")
return True
elif len(failed_permanent) == len(recipients):
# Alle permanent fehlgeschlagen (User unknown) -> Löschen
mark_as_all_invalid(bucket, key, failed_permanent)
log(f"🛑 All recipients invalid. Deleted from queue.")
return True
else:
# Temporäre Fehler -> Retry
log(f"🔄 Temporary failures. Keeping in queue.")
return False
except Exception as e:
log(f"❌ CRITICAL WORKER ERROR: {e}", 'ERROR')
traceback.print_exc()
return False # Retry (außer es crasht immer wieder)
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
log(f" Max messages per poll: {MAX_MESSAGES}")
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
log(f"{'='*70}\n")
consecutive_errors = 0
max_consecutive_errors = 10
messages_processed = 0
last_activity = time.time()
while not shutdown_requested:
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All']
)
# Reset error counter bei erfolgreicher Abfrage
consecutive_errors = 0
if 'Messages' not in response:
# Keine Nachrichten
if time.time() - last_activity > 60:
log(f"Waiting for messages... (processed: {messages_processed})")
last_activity = time.time()
continue
message_count = len(response['Messages'])
log(f"\n✉ Received {message_count} message(s) from queue")
last_activity = time.time()
# Messages verarbeiten
for msg in response['Messages']:
if shutdown_requested:
log("Shutdown requested, stopping processing")
break
receipt_handle = msg['ReceiptHandle']
# Receive Count auslesen
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
# Sent Timestamp (für Queue-Zeit-Berechnung)
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
if queue_time > 0:
log(f"Message was in queue for {queue_time}s")
try:
message_body = json.loads(msg['Body'])
# E-Mail verarbeiten
success = process_message(message_body, receive_count)
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
messages_processed += 1
else:
# Bei Fehler bleibt Message in Queue
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR')
traceback.print_exc()
# Message bleibt in Queue für Retry
except KeyboardInterrupt:
log("\n⚠ Keyboard interrupt received")
break
except Exception as e:
consecutive_errors += 1
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
traceback.print_exc()
if consecutive_errors >= max_consecutive_errors:
log("Too many consecutive errors, shutting down", 'ERROR')
break
# Kurze Pause bei Fehlern
time.sleep(5)
log(f"\n{'='*70}")
log(f"👋 Worker shutting down")
log(f" Messages processed: {messages_processed}")
log(f"{'='*70}\n")
if __name__ == '__main__':
# Validierung
if not WORKER_DOMAIN:
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e:
log(f"Fatal error: {e}", 'ERROR')
traceback.print_exc()
sys.exit(1)

View File

@@ -0,0 +1,520 @@
import os
import sys
import boto3
import smtplib
import json
import time
import traceback
import signal
from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
MAX_MESSAGES = int(os.environ.get('MAX_MESSAGES', '10'))
VISIBILITY_TIMEOUT = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
# SMTP Configuration (einfach, da nur 1 Domain pro Worker)
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '25'))
SMTP_USE_TLS = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
SMTP_USER = os.environ.get('SMTP_USER')
SMTP_PASS = os.environ.get('SMTP_PASS')
# Graceful shutdown
shutdown_requested = False
def signal_handler(signum, frame):
global shutdown_requested
print(f"\n⚠ Shutdown signal received (signal {signum})")
shutdown_requested = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def log(message: str, level: str = 'INFO'):
"""Structured logging with timestamp"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str, invalid_inboxes: list = None):
"""
Markiert E-Mail als erfolgreich zugestellt
Wird nur aufgerufen wenn mindestens 1 Recipient erfolgreich war
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'delivered'
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
# Invalid inboxes speichern falls vorhanden
if invalid_inboxes:
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
log(f"⚠ Invalid inboxes recorded: {', '.join(invalid_inboxes)}", 'WARNING')
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as processed", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as processed: {e}", 'WARNING')
def mark_as_all_invalid(bucket: str, key: str, invalid_inboxes: list):
"""
Markiert E-Mail als fehlgeschlagen weil alle Recipients ungültig sind
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['processed'] = 'true'
metadata['processed_at'] = str(int(time.time()))
metadata['processed_by'] = WORKER_NAME
metadata['status'] = 'failed'
metadata['error'] = 'All recipients are invalid (mailboxes do not exist)'
metadata['invalid_inboxes'] = ','.join(invalid_inboxes)
metadata.pop('processing_started', None)
metadata.pop('queued_at', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✓ Marked s3://{bucket}/{key} as failed (all invalid)", 'SUCCESS')
except Exception as e:
log(f"Failed to mark as all invalid: {e}", 'WARNING')
def mark_as_failed(bucket: str, key: str, error: str, receive_count: int):
"""
Markiert E-Mail als komplett fehlgeschlagen
Wird nur aufgerufen wenn ALLE Recipients fehlschlagen
"""
try:
head = s3.head_object(Bucket=bucket, Key=key)
metadata = head.get('Metadata', {}) or {}
metadata['status'] = 'failed'
metadata['failed_at'] = str(int(time.time()))
metadata['failed_by'] = WORKER_NAME
metadata['error'] = error[:500] # S3 Metadata limit
metadata['retry_count'] = str(receive_count)
metadata.pop('processing_started', None)
s3.copy_object(
Bucket=bucket,
Key=key,
CopySource={'Bucket': bucket, 'Key': key},
Metadata=metadata,
MetadataDirective='REPLACE'
)
log(f"✗ Marked s3://{bucket}/{key} as failed: {error[:100]}", 'ERROR')
except Exception as e:
log(f"Failed to mark as failed: {e}", 'WARNING')
def is_temporary_smtp_error(error_msg: str) -> bool:
"""
Prüft ob SMTP-Fehler temporär ist (Retry sinnvoll)
4xx Codes = temporär, 5xx = permanent
"""
temporary_indicators = [
'421', # Service not available
'450', # Mailbox unavailable
'451', # Local error
'452', # Insufficient storage
'4', # Generisch 4xx
'timeout',
'connection refused',
'connection reset',
'network unreachable',
'temporarily',
'try again'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in temporary_indicators)
def is_permanent_recipient_error(error_msg: str) -> bool:
"""
Prüft ob Fehler permanent für diesen Recipient ist (Inbox existiert nicht)
550 = Mailbox not found, 551 = User not local, 553 = Mailbox name invalid
"""
permanent_indicators = [
'550', # Mailbox unavailable / not found
'551', # User not local
'553', # Mailbox name not allowed / invalid
'mailbox not found',
'user unknown',
'no such user',
'recipient rejected',
'does not exist',
'invalid recipient',
'unknown user'
]
error_lower = error_msg.lower()
return any(indicator in error_lower for indicator in permanent_indicators)
def send_email(from_addr: str, recipient: str, raw_message: bytes) -> tuple:
"""
Sendet E-Mail via SMTP an EINEN Empfänger
Returns: (success: bool, error: str or None, is_permanent: bool)
"""
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as smtp:
smtp.ehlo()
# STARTTLS falls konfiguriert
if SMTP_USE_TLS:
try:
smtp.starttls()
smtp.ehlo()
except Exception as e:
log(f" STARTTLS failed: {e}", 'WARNING')
# Authentication falls konfiguriert
if SMTP_USER and SMTP_PASS:
try:
smtp.login(SMTP_USER, SMTP_PASS)
except Exception as e:
log(f" SMTP auth failed: {e}", 'WARNING')
# E-Mail senden
result = smtp.sendmail(from_addr, [recipient], raw_message)
# Result auswerten
if isinstance(result, dict) and result:
# Empfänger wurde abgelehnt
error = result.get(recipient, 'Unknown refusal')
is_permanent = is_permanent_recipient_error(str(error))
log(f"{recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR')
return False, str(error), is_permanent
else:
# Erfolgreich
log(f"{recipient}: Delivered", 'SUCCESS')
return True, None, False
except smtplib.SMTPException as e:
error_msg = str(e)
is_permanent = is_permanent_recipient_error(error_msg)
log(f"{recipient}: SMTP error - {error_msg}", 'ERROR')
return False, error_msg, is_permanent
except Exception as e:
# Connection errors sind immer temporär
log(f"{recipient}: Connection error - {e}", 'ERROR')
return False, str(e), False
def process_message(message_body: dict, receive_count: int) -> bool:
"""
Verarbeitet eine E-Mail aus der Queue
Kann mehrere Recipients haben - sendet an alle
Returns: True wenn erfolgreich (Message löschen), False bei Fehler (Retry)
"""
bucket = message_body['bucket']
key = message_body['key']
from_addr = message_body['from']
recipients = message_body['recipients'] # Liste von Empfängern
domain = message_body['domain']
subject = message_body.get('subject', '(unknown)')
message_id = message_body.get('message_id', '(unknown)')
log(f"\n{'='*70}")
log(f"Processing email (Attempt #{receive_count}):")
log(f" MessageId: {message_id}")
log(f" S3 Key: {key}")
log(f" Domain: {domain}")
log(f" From: {from_addr}")
log(f" Recipients: {len(recipients)}")
for recipient in recipients:
log(f" - {recipient}")
log(f" Subject: {subject}")
log(f" S3: s3://{bucket}/{key}")
log(f"{'='*70}")
# ✨ VALIDATION: Domain muss mit Worker-Domain übereinstimmen
if domain.lower() != WORKER_DOMAIN.lower():
log(f"ERROR: Wrong domain! Expected {WORKER_DOMAIN}, got {domain}", 'ERROR')
log("This message should not be in this queue! Deleting...", 'ERROR')
return True # Message löschen (gehört nicht hierher)
# E-Mail aus S3 laden
try:
response = s3.get_object(Bucket=bucket, Key=key)
raw_bytes = response['Body'].read()
log(f"✓ Loaded {len(raw_bytes):,} bytes ({len(raw_bytes)/1024:.1f} KB)")
except s3.exceptions.NoSuchKey:
log(f"✗ S3 object not found (may have been deleted)", 'ERROR')
return True # Nicht retryable - Message löschen
except Exception as e:
log(f"✗ Failed to load from S3: {e}", 'ERROR')
return False # Könnte temporär sein - retry
# An alle Recipients senden
log(f"\n📤 Sending to {len(recipients)} recipient(s)...")
log(f"Connecting to {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
successful = []
failed_temporary = []
failed_permanent = []
for recipient in recipients:
success, error, is_permanent = send_email(from_addr, recipient, raw_bytes)
if success:
successful.append(recipient)
elif is_permanent:
failed_permanent.append(recipient)
else:
failed_temporary.append(recipient)
# Ergebnis-Zusammenfassung
log(f"\n📊 Delivery Results:")
log(f" ✓ Successful: {len(successful)}/{len(recipients)}")
log(f" ✗ Failed (temporary): {len(failed_temporary)}")
log(f" ✗ Failed (permanent): {len(failed_permanent)}")
# Entscheidungslogik
if len(successful) > 0:
# ✅ Fall 1: Mindestens 1 Recipient erfolgreich
# → status=delivered, invalid_inboxes tracken
invalid_inboxes = failed_permanent if failed_permanent else None
mark_as_processed(bucket, key, invalid_inboxes)
log(f"{'='*70}")
log(f"✅ Email delivered to {len(successful)} recipient(s)", 'SUCCESS')
if failed_permanent:
log(f"{len(failed_permanent)} invalid inbox(es): {', '.join(failed_permanent)}", 'WARNING')
if failed_temporary:
log(f"{len(failed_temporary)} temporary failure(s) - NOT retrying (at least 1 success)", 'WARNING')
log(f"{'='*70}\n")
return True # Message löschen
elif len(failed_permanent) == len(recipients):
# ❌ Fall 2: ALLE Recipients permanent fehlgeschlagen (alle Inboxen ungültig)
# → status=failed, invalid_inboxes = ALLE
mark_as_all_invalid(bucket, key, failed_permanent)
log(f"{'='*70}")
log(f"✗ All recipients are invalid inboxes - NO delivery", 'ERROR')
log(f" Invalid: {', '.join(failed_permanent)}", 'ERROR')
log(f"{'='*70}\n")
return True # Message löschen (nicht retryable)
else:
# ⏳ Fall 3: Nur temporäre Fehler, keine erfolgreichen Deliveries
# → Retry wenn noch Versuche übrig
if receive_count < 3:
log(f"⚠ All failures are temporary, will retry", 'WARNING')
log(f"{'='*70}\n")
return False # Message NICHT löschen → Retry
else:
# Max retries erreicht → als failed markieren
error_summary = f"Failed after {receive_count} attempts. Temporary errors for all recipients."
mark_as_failed(bucket, key, error_summary, receive_count)
log(f"{'='*70}")
log(f"✗ Email delivery failed permanently after {receive_count} attempts", 'ERROR')
log(f"{'='*70}\n")
return False # Nach 3 Versuchen → automatisch DLQ
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
log(f" Max messages per poll: {MAX_MESSAGES}")
log(f" Visibility timeout: {VISIBILITY_TIMEOUT}s")
log(f"{'='*70}\n")
consecutive_errors = 0
max_consecutive_errors = 10
messages_processed = 0
last_activity = time.time()
while not shutdown_requested:
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All']
)
# Reset error counter bei erfolgreicher Abfrage
consecutive_errors = 0
if 'Messages' not in response:
# Keine Nachrichten
if time.time() - last_activity > 60:
log(f"Waiting for messages... (processed: {messages_processed})")
last_activity = time.time()
continue
message_count = len(response['Messages'])
log(f"\n✉ Received {message_count} message(s) from queue")
last_activity = time.time()
# Messages verarbeiten
for msg in response['Messages']:
if shutdown_requested:
log("Shutdown requested, stopping processing")
break
receipt_handle = msg['ReceiptHandle']
# Receive Count auslesen
receive_count = int(msg.get('Attributes', {}).get('ApproximateReceiveCount', 1))
# Sent Timestamp (für Queue-Zeit-Berechnung)
sent_timestamp = int(msg.get('Attributes', {}).get('SentTimestamp', 0)) / 1000
queue_time = int(time.time() - sent_timestamp) if sent_timestamp else 0
if queue_time > 0:
log(f"Message was in queue for {queue_time}s")
try:
message_body = json.loads(msg['Body'])
# E-Mail verarbeiten
success = process_message(message_body, receive_count)
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
messages_processed += 1
else:
# Bei Fehler bleibt Message in Queue
log(f"⚠ Message kept in queue for retry (attempt {receive_count}/3)")
except json.JSONDecodeError as e:
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
log(f"✗ Error processing message: {e}", 'ERROR')
traceback.print_exc()
# Message bleibt in Queue für Retry
except KeyboardInterrupt:
log("\n⚠ Keyboard interrupt received")
break
except Exception as e:
consecutive_errors += 1
log(f"✗ Error in main loop ({consecutive_errors}/{max_consecutive_errors}): {e}", 'ERROR')
traceback.print_exc()
if consecutive_errors >= max_consecutive_errors:
log("Too many consecutive errors, shutting down", 'ERROR')
break
# Kurze Pause bei Fehlern
time.sleep(5)
log(f"\n{'='*70}")
log(f"👋 Worker shutting down")
log(f" Messages processed: {messages_processed}")
log(f"{'='*70}\n")
if __name__ == '__main__':
# Validierung
if not WORKER_DOMAIN:
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e:
log(f"Fatal error: {e}", 'ERROR')
traceback.print_exc()
sys.exit(1)