forward/reply solution for internal mails
This commit is contained in:
@@ -86,7 +86,7 @@ class Config:
|
||||
max_messages: int = int(os.environ.get('MAX_MESSAGES', '10'))
|
||||
visibility_timeout: int = int(os.environ.get('VISIBILITY_TIMEOUT', '300'))
|
||||
|
||||
# SMTP
|
||||
# SMTP for delivery (should use LMTP port 24 to bypass transport_maps)
|
||||
smtp_host: str = os.environ.get('SMTP_HOST', 'localhost')
|
||||
smtp_port: int = int(os.environ.get('SMTP_PORT', '25'))
|
||||
smtp_use_tls: bool = os.environ.get('SMTP_USE_TLS', 'false').lower() == 'true'
|
||||
@@ -94,6 +94,12 @@ class Config:
|
||||
smtp_pass: str = os.environ.get('SMTP_PASS', '')
|
||||
smtp_pool_size: int = int(os.environ.get('SMTP_POOL_SIZE', '5'))
|
||||
|
||||
# LMTP for local delivery (bypasses Postfix transport_maps completely)
|
||||
# Set LMTP_ENABLED=true and LMTP_PORT=24 to use Dovecot LMTP
|
||||
lmtp_enabled: bool = os.environ.get('LMTP_ENABLED', 'false').lower() == 'true'
|
||||
lmtp_host: str = os.environ.get('LMTP_HOST', 'localhost')
|
||||
lmtp_port: int = int(os.environ.get('LMTP_PORT', '24'))
|
||||
|
||||
# DynamoDB Tables
|
||||
rules_table: str = os.environ.get('DYNAMODB_RULES_TABLE', 'email-rules')
|
||||
messages_table: str = os.environ.get('DYNAMODB_MESSAGES_TABLE', 'ses-outbound-messages')
|
||||
@@ -261,6 +267,9 @@ class SMTPPool:
|
||||
|
||||
smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
|
||||
|
||||
# Global set of domains we manage (populated at startup)
|
||||
MANAGED_DOMAINS: set = set()
|
||||
|
||||
# ============================================
|
||||
# HELPER FUNCTIONS
|
||||
# ============================================
|
||||
@@ -271,6 +280,30 @@ def domain_to_queue_name(domain: str) -> str:
|
||||
def domain_to_bucket_name(domain: str) -> str:
|
||||
return domain.replace('.', '-') + '-emails'
|
||||
|
||||
def is_internal_address(email_address: str) -> bool:
|
||||
"""Check if email address belongs to one of our managed domains"""
|
||||
if '@' not in email_address:
|
||||
return False
|
||||
domain = email_address.split('@')[1].lower()
|
||||
return domain in MANAGED_DOMAINS
|
||||
|
||||
def send_internal_email(from_addr: str, to_addr: str, raw_message: bytes, worker_name: str) -> bool:
|
||||
"""
|
||||
Send email via local SMTP port 2525 (bypasses transport_maps).
|
||||
Used for internal forwards to avoid SES loop.
|
||||
Returns: True on success, False on failure
|
||||
"""
|
||||
try:
|
||||
# Direkte SMTP Verbindung auf Port 2525 (ohne transport_maps)
|
||||
with smtplib.SMTP(config.smtp_host, 2525, timeout=30) as conn:
|
||||
conn.ehlo()
|
||||
conn.sendmail(from_addr, [to_addr], raw_message)
|
||||
log(f" ✓ Internal delivery to {to_addr} (Port 2525)", 'SUCCESS', worker_name)
|
||||
return True
|
||||
except Exception as e:
|
||||
log(f" ✗ Internal delivery failed to {to_addr}: {e}", 'ERROR', worker_name)
|
||||
return False
|
||||
|
||||
def get_queue_url(domain: str) -> Optional[str]:
|
||||
queue_name = domain_to_queue_name(domain)
|
||||
try:
|
||||
@@ -284,7 +317,8 @@ def get_queue_url(domain: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
def load_domains() -> List[str]:
|
||||
"""Load domains from config"""
|
||||
"""Load domains from config and populate MANAGED_DOMAINS global"""
|
||||
global MANAGED_DOMAINS
|
||||
domains = []
|
||||
|
||||
if config.domains_list:
|
||||
@@ -298,6 +332,10 @@ def load_domains() -> List[str]:
|
||||
domains.append(domain)
|
||||
|
||||
domains = list(set(domains))
|
||||
|
||||
# Populate global set for is_internal_address() checks
|
||||
MANAGED_DOMAINS = set(d.lower() for d in domains)
|
||||
|
||||
log(f"Loaded {len(domains)} domains: {', '.join(domains)}")
|
||||
return domains
|
||||
|
||||
@@ -609,13 +647,24 @@ def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name
|
||||
else:
|
||||
try:
|
||||
ooo_reply = create_ooo_reply(parsed, recipient, ooo_msg, content_type)
|
||||
ooo_bytes = ooo_reply.as_bytes()
|
||||
|
||||
ses.send_raw_email(
|
||||
Source=recipient,
|
||||
Destinations=[sender_addr],
|
||||
RawMessage={'Data': ooo_reply.as_bytes()}
|
||||
)
|
||||
log(f"✓ Sent OOO reply to {sender_addr} from {recipient}", 'SUCCESS', worker_name)
|
||||
# Unterscheiden: Intern (Port 2525) vs Extern (SES)
|
||||
if is_internal_address(sender_addr):
|
||||
# Interne Adresse → direkt via Port 2525
|
||||
success = send_internal_email(recipient, sender_addr, ooo_bytes, worker_name)
|
||||
if success:
|
||||
log(f"✓ Sent OOO reply internally to {sender_addr}", 'SUCCESS', worker_name)
|
||||
else:
|
||||
log(f"⚠ Internal OOO reply failed to {sender_addr}", 'WARNING', worker_name)
|
||||
else:
|
||||
# Externe Adresse → via SES
|
||||
ses.send_raw_email(
|
||||
Source=recipient,
|
||||
Destinations=[sender_addr],
|
||||
RawMessage={'Data': ooo_bytes}
|
||||
)
|
||||
log(f"✓ Sent OOO reply externally to {sender_addr} via SES", 'SUCCESS', worker_name)
|
||||
|
||||
if PROMETHEUS_ENABLED:
|
||||
autoreplies_sent.labels(domain=domain).inc()
|
||||
@@ -623,6 +672,8 @@ def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name
|
||||
except ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
log(f"⚠ SES OOO send failed ({error_code}): {e}", 'ERROR', worker_name)
|
||||
except Exception as e:
|
||||
log(f"⚠ OOO reply failed to {sender_addr}: {e}", 'ERROR', worker_name)
|
||||
|
||||
# ============================================
|
||||
# Forward handling
|
||||
@@ -632,13 +683,24 @@ def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name
|
||||
for forward_to in forwards:
|
||||
try:
|
||||
fwd_msg = create_forward_message(parsed, recipient, forward_to, original_from)
|
||||
fwd_bytes = fwd_msg.as_bytes()
|
||||
|
||||
ses.send_raw_email(
|
||||
Source=recipient,
|
||||
Destinations=[forward_to],
|
||||
RawMessage={'Data': fwd_msg.as_bytes()}
|
||||
)
|
||||
log(f"✓ Forwarded to {forward_to} from {recipient}", 'SUCCESS', worker_name)
|
||||
# Unterscheiden: Intern (Port 2525) vs Extern (SES)
|
||||
if is_internal_address(forward_to):
|
||||
# Interne Adresse → direkt via Port 2525 (keine Loop!)
|
||||
success = send_internal_email(recipient, forward_to, fwd_bytes, worker_name)
|
||||
if success:
|
||||
log(f"✓ Forwarded internally to {forward_to}", 'SUCCESS', worker_name)
|
||||
else:
|
||||
log(f"⚠ Internal forward failed to {forward_to}", 'WARNING', worker_name)
|
||||
else:
|
||||
# Externe Adresse → via SES
|
||||
ses.send_raw_email(
|
||||
Source=recipient,
|
||||
Destinations=[forward_to],
|
||||
RawMessage={'Data': fwd_bytes}
|
||||
)
|
||||
log(f"✓ Forwarded externally to {forward_to} via SES", 'SUCCESS', worker_name)
|
||||
|
||||
if PROMETHEUS_ENABLED:
|
||||
forwards_sent.labels(domain=domain).inc()
|
||||
@@ -646,6 +708,8 @@ def process_rules_for_recipient(recipient: str, parsed, domain: str, worker_name
|
||||
except ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
log(f"⚠ SES forward failed to {forward_to} ({error_code}): {e}", 'ERROR', worker_name)
|
||||
except Exception as e:
|
||||
log(f"⚠ Forward failed to {forward_to}: {e}", 'ERROR', worker_name)
|
||||
|
||||
except ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
@@ -683,26 +747,41 @@ def is_permanent_recipient_error(error_msg: str) -> bool:
|
||||
|
||||
def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes, worker_name: str, max_retries: int = 2) -> Tuple[bool, Optional[str], bool]:
|
||||
"""
|
||||
Sendet E-Mail via SMTP an EINEN Empfänger
|
||||
Mit Retry-Logik bei Connection-Fehlern
|
||||
Sendet E-Mail via SMTP/LMTP an EINEN Empfänger.
|
||||
Wenn LMTP aktiviert ist, wird direkt an Dovecot geliefert (umgeht transport_maps).
|
||||
Mit Retry-Logik bei Connection-Fehlern.
|
||||
Returns: (success: bool, error: str or None, is_permanent: bool)
|
||||
"""
|
||||
last_error = None
|
||||
|
||||
# Entscheide ob LMTP oder SMTP
|
||||
use_lmtp = config.lmtp_enabled
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
smtp_conn = smtp_pool.get_connection()
|
||||
|
||||
if not smtp_conn:
|
||||
last_error = "Could not get SMTP connection"
|
||||
log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
conn = None
|
||||
|
||||
try:
|
||||
result = smtp_conn.sendmail(from_addr, [recipient], raw_message)
|
||||
if use_lmtp:
|
||||
# LMTP Verbindung direkt zu Dovecot (umgeht Postfix/transport_maps)
|
||||
conn = smtplib.LMTP(config.lmtp_host, config.lmtp_port, timeout=30)
|
||||
# LMTP braucht kein EHLO, aber schadet nicht
|
||||
conn.ehlo()
|
||||
else:
|
||||
# Normale SMTP Verbindung aus dem Pool
|
||||
conn = smtp_pool.get_connection()
|
||||
if not conn:
|
||||
last_error = "Could not get SMTP connection"
|
||||
log(f" ⚠ {recipient}: No SMTP connection (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
# Connection war erfolgreich, zurück in Pool
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
result = conn.sendmail(from_addr, [recipient], raw_message)
|
||||
|
||||
# Erfolg
|
||||
if use_lmtp:
|
||||
conn.quit()
|
||||
else:
|
||||
smtp_pool.return_connection(conn)
|
||||
|
||||
if isinstance(result, dict) and result:
|
||||
error = str(result.get(recipient, 'Unknown refusal'))
|
||||
@@ -710,23 +789,30 @@ def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes,
|
||||
log(f" ✗ {recipient}: {error} ({'permanent' if is_permanent else 'temporary'})", 'ERROR', worker_name)
|
||||
return False, error, is_permanent
|
||||
else:
|
||||
log(f" ✓ {recipient}: Delivered", 'SUCCESS', worker_name)
|
||||
delivery_method = "LMTP" if use_lmtp else "SMTP"
|
||||
log(f" ✓ {recipient}: Delivered ({delivery_method})", 'SUCCESS', worker_name)
|
||||
return True, None, False
|
||||
|
||||
except smtplib.SMTPServerDisconnected as e:
|
||||
# Connection wurde geschlossen - Retry mit neuer Connection
|
||||
log(f" ⚠ {recipient}: Connection lost, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
last_error = str(e)
|
||||
# Connection nicht zurückgeben (ist kaputt)
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
if conn:
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
time.sleep(0.3)
|
||||
continue
|
||||
|
||||
except smtplib.SMTPRecipientsRefused as e:
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
if conn and not use_lmtp:
|
||||
smtp_pool.return_connection(conn)
|
||||
elif conn:
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
error_msg = str(e)
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: Recipients refused - {error_msg}", 'ERROR', worker_name)
|
||||
@@ -736,26 +822,34 @@ def send_email_to_recipient(from_addr: str, recipient: str, raw_message: bytes,
|
||||
error_msg = str(e)
|
||||
# Bei Connection-Fehlern: Retry
|
||||
if 'disconnect' in error_msg.lower() or 'closed' in error_msg.lower() or 'connection' in error_msg.lower():
|
||||
log(f" ⚠ {recipient}: SMTP connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
log(f" ⚠ {recipient}: Connection error, retrying... (attempt {attempt + 1}/{max_retries + 1})", 'WARNING', worker_name)
|
||||
last_error = error_msg
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
if conn:
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
time.sleep(0.3)
|
||||
continue
|
||||
|
||||
smtp_pool.return_connection(smtp_conn)
|
||||
if conn and not use_lmtp:
|
||||
smtp_pool.return_connection(conn)
|
||||
elif conn:
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
is_permanent = is_permanent_recipient_error(error_msg)
|
||||
log(f" ✗ {recipient}: SMTP error - {error_msg}", 'ERROR', worker_name)
|
||||
log(f" ✗ {recipient}: Error - {error_msg}", 'ERROR', worker_name)
|
||||
return False, error_msg, is_permanent
|
||||
|
||||
except Exception as e:
|
||||
# Unbekannter Fehler - Connection verwerfen, aber nicht permanent
|
||||
try:
|
||||
smtp_conn.quit()
|
||||
except:
|
||||
pass
|
||||
# Unbekannter Fehler
|
||||
if conn:
|
||||
try:
|
||||
conn.quit()
|
||||
except:
|
||||
pass
|
||||
log(f" ✗ {recipient}: Unexpected error - {e}", 'ERROR', worker_name)
|
||||
return False, str(e), False
|
||||
|
||||
@@ -1286,7 +1380,10 @@ def main():
|
||||
log(f"{'='*70}")
|
||||
log(f" Domains: {len(worker.queue_urls)}")
|
||||
log(f" DynamoDB: {'Connected' if DYNAMODB_AVAILABLE else 'Not Available'}")
|
||||
log(f" SMTP Pool: {config.smtp_pool_size} connections -> {config.smtp_host}:{config.smtp_port}")
|
||||
if config.lmtp_enabled:
|
||||
log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)")
|
||||
else:
|
||||
log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}")
|
||||
log(f" Poll Interval: {config.poll_interval}s")
|
||||
log(f" Visibility: {config.visibility_timeout}s")
|
||||
log(f"")
|
||||
@@ -1295,6 +1392,7 @@ def main():
|
||||
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Auto-Reply / Out-of-Office")
|
||||
log(f" {'✓' if DYNAMODB_AVAILABLE else '✗'} Email Forwarding")
|
||||
log(f" {'✓' if PROMETHEUS_ENABLED else '✗'} Prometheus Metrics")
|
||||
log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery")
|
||||
log(f"")
|
||||
log(f" Active Domains:")
|
||||
for domain in sorted(worker.queue_urls.keys()):
|
||||
@@ -1304,4 +1402,4 @@ def main():
|
||||
worker.start()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
Reference in New Issue
Block a user