This commit is contained in:
2025-10-12 20:14:46 -05:00
parent 5488537df6
commit 27980ca0a1
2 changed files with 55 additions and 77 deletions

View File

@@ -1,4 +1,3 @@
# worker.py
import os
import sys
import boto3
@@ -11,15 +10,14 @@ from email.parser import BytesParser
from email.policy import SMTP as SMTPPolicy
from datetime import datetime
# AWS Clients
AWS_REGION = os.environ.get('AWS_REGION', 'eu-central-1')
# AWS Configuration
AWS_REGION = 'us-east-2'
s3 = boto3.client('s3', region_name=AWS_REGION)
sqs = boto3.client('sqs', region_name=AWS_REGION)
# ✨ Worker Configuration (domain-spezifisch)
WORKER_DOMAIN = os.environ.get('WORKER_DOMAIN') # z.B. 'andreasknuth.de'
WORKER_NAME = os.environ.get('WORKER_NAME', f'worker-{WORKER_DOMAIN}')
QUEUE_URL = os.environ.get('SQS_QUEUE_URL')
# Worker Settings
POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '20'))
@@ -53,6 +51,22 @@ def log(message: str, level: str = 'INFO'):
print(f"[{timestamp}] [{level}] [{WORKER_NAME}] {message}", flush=True)
def domain_to_queue_name(domain: str) -> str:
"""Konvertiert Domain zu SQS Queue Namen"""
return domain.replace('.', '-') + '-queue'
def get_queue_url() -> str:
"""Ermittelt Queue-URL für die konfigurierte Domain"""
queue_name = domain_to_queue_name(WORKER_DOMAIN)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
raise Exception(f"Failed to get queue URL for {WORKER_DOMAIN}: {e}")
def mark_as_processed(bucket: str, key: str):
"""Markiert E-Mail als erfolgreich zugestellt"""
try:
@@ -261,12 +275,19 @@ def process_message(message_body: dict, receive_count: int) -> bool:
def main_loop():
"""Hauptschleife: Pollt SQS Queue und verarbeitet Nachrichten"""
# Queue URL ermitteln
try:
queue_url = get_queue_url()
except Exception as e:
log(f"FATAL: {e}", 'ERROR')
sys.exit(1)
log(f"\n{'='*70}")
log(f"🚀 Email Worker started")
log(f"{'='*70}")
log(f" Worker Name: {WORKER_NAME}")
log(f" Domain: {WORKER_DOMAIN}")
log(f" Queue: {QUEUE_URL}")
log(f" Queue: {queue_url}")
log(f" Region: {AWS_REGION}")
log(f" SMTP: {SMTP_HOST}:{SMTP_PORT} (TLS: {SMTP_USE_TLS})")
log(f" Poll interval: {POLL_INTERVAL}s")
@@ -283,7 +304,7 @@ def main_loop():
try:
# Messages aus Queue holen (Long Polling)
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
QueueUrl=queue_url,
MaxNumberOfMessages=MAX_MESSAGES,
WaitTimeSeconds=POLL_INTERVAL,
VisibilityTimeout=VISIBILITY_TIMEOUT,
@@ -332,7 +353,7 @@ def main_loop():
if success:
# Message aus Queue löschen
sqs.delete_message(
QueueUrl=QUEUE_URL,
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
log("✓ Message deleted from queue")
@@ -345,7 +366,7 @@ def main_loop():
log(f"✗ Invalid message format: {e}", 'ERROR')
# Ungültige Messages löschen (nicht retryable)
sqs.delete_message(
QueueUrl=QUEUE_URL,
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
@@ -382,10 +403,6 @@ if __name__ == '__main__':
log("ERROR: WORKER_DOMAIN not set!", 'ERROR')
sys.exit(1)
if not QUEUE_URL:
log("ERROR: SQS_QUEUE_URL not set!", 'ERROR')
sys.exit(1)
try:
main_loop()
except Exception as e: