modular worker
This commit is contained in:
201
unified-worker/email-worker/unified_worker.py
Normal file
201
unified-worker/email-worker/unified_worker.py
Normal file
@@ -0,0 +1,201 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unified Worker - coordinates all domain pollers
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
from typing import List, Dict
|
||||
|
||||
from logger import log
|
||||
from config import config, load_domains
|
||||
from aws import S3Handler, SQSHandler, SESHandler, DynamoDBHandler
|
||||
from smtp import SMTPPool
|
||||
from smtp.delivery import EmailDelivery
|
||||
from worker import MessageProcessor
|
||||
from domain_poller import DomainPoller
|
||||
from metrics.prometheus import MetricsCollector
|
||||
|
||||
|
||||
class UnifiedWorker:
|
||||
"""Main worker coordinating all domain pollers"""
|
||||
|
||||
def __init__(self):
|
||||
self.stop_event = threading.Event()
|
||||
self.domains: List[str] = []
|
||||
self.queue_urls: Dict[str, str] = {}
|
||||
self.poller_threads: List[threading.Thread] = []
|
||||
|
||||
# Shared stats across all pollers
|
||||
self.domain_stats: Dict[str, int] = {} # domain -> processed count
|
||||
self.stats_lock = threading.Lock()
|
||||
|
||||
# AWS handlers
|
||||
self.s3 = S3Handler()
|
||||
self.sqs = SQSHandler()
|
||||
self.ses = SESHandler()
|
||||
self.dynamodb = DynamoDBHandler()
|
||||
|
||||
# SMTP pool
|
||||
self.smtp_pool = SMTPPool(config.smtp_host, config.smtp_port, config.smtp_pool_size)
|
||||
|
||||
# Email delivery
|
||||
self.delivery = EmailDelivery(self.smtp_pool)
|
||||
|
||||
# Metrics
|
||||
self.metrics: MetricsCollector = None
|
||||
|
||||
# Message processor
|
||||
self.processor = MessageProcessor(
|
||||
self.s3,
|
||||
self.sqs,
|
||||
self.ses,
|
||||
self.dynamodb,
|
||||
self.delivery,
|
||||
None # Metrics will be set later
|
||||
)
|
||||
|
||||
def setup(self):
|
||||
"""Initialize worker"""
|
||||
self.domains = load_domains()
|
||||
|
||||
if not self.domains:
|
||||
log("❌ No domains configured!", 'ERROR')
|
||||
sys.exit(1)
|
||||
|
||||
# Get queue URLs
|
||||
for domain in self.domains:
|
||||
url = self.sqs.get_queue_url(domain)
|
||||
if url:
|
||||
self.queue_urls[domain] = url
|
||||
log(f" ✓ {domain} -> queue found")
|
||||
else:
|
||||
log(f" ✗ {domain} -> Queue not found!", 'WARNING')
|
||||
|
||||
if not self.queue_urls:
|
||||
log("❌ No valid queues found!", 'ERROR')
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize SMTP pool
|
||||
self.smtp_pool.initialize()
|
||||
|
||||
log(f"Initialized with {len(self.queue_urls)} domains")
|
||||
|
||||
def start(self):
|
||||
"""Start all domain pollers"""
|
||||
# Initialize stats for all domains
|
||||
for domain in self.queue_urls.keys():
|
||||
self.domain_stats[domain] = 0
|
||||
|
||||
# Create poller for each domain
|
||||
for domain, queue_url in self.queue_urls.items():
|
||||
poller = DomainPoller(
|
||||
domain=domain,
|
||||
queue_url=queue_url,
|
||||
message_processor=self.processor,
|
||||
sqs=self.sqs,
|
||||
metrics=self.metrics,
|
||||
stop_event=self.stop_event,
|
||||
stats_dict=self.domain_stats,
|
||||
stats_lock=self.stats_lock
|
||||
)
|
||||
|
||||
thread = threading.Thread(
|
||||
target=poller.poll,
|
||||
name=f"poller-{domain}",
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
self.poller_threads.append(thread)
|
||||
|
||||
log(f"Started {len(self.poller_threads)} domain pollers")
|
||||
|
||||
# Periodic status log (every 5 minutes)
|
||||
last_status_log = time.time()
|
||||
status_interval = 300 # 5 minutes
|
||||
|
||||
try:
|
||||
while not self.stop_event.is_set():
|
||||
self.stop_event.wait(timeout=10)
|
||||
|
||||
# Log status summary every 5 minutes
|
||||
if time.time() - last_status_log > status_interval:
|
||||
self._log_status_table()
|
||||
last_status_log = time.time()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def _log_status_table(self):
|
||||
"""Log a compact status table"""
|
||||
active_threads = sum(1 for t in self.poller_threads if t.is_alive())
|
||||
|
||||
with self.stats_lock:
|
||||
total_processed = sum(self.domain_stats.values())
|
||||
|
||||
# Build compact stats: only show domains with activity or top domains
|
||||
stats_parts = []
|
||||
for domain in sorted(self.queue_urls.keys()):
|
||||
count = self.domain_stats.get(domain, 0)
|
||||
if count > 0: # Only show active domains
|
||||
# Shorten domain for display
|
||||
short_domain = domain.split('.')[0][:12]
|
||||
stats_parts.append(f"{short_domain}:{count}")
|
||||
|
||||
if stats_parts:
|
||||
stats_line = " | ".join(stats_parts)
|
||||
else:
|
||||
stats_line = "no activity"
|
||||
|
||||
log(
|
||||
f"📊 Status: {active_threads}/{len(self.poller_threads)} active, "
|
||||
f"total:{total_processed} | {stats_line}"
|
||||
)
|
||||
|
||||
def stop(self):
|
||||
"""Stop gracefully"""
|
||||
log("⚠ Stopping worker...")
|
||||
self.stop_event.set()
|
||||
|
||||
# Wait for poller threads (max 10 seconds each)
|
||||
for thread in self.poller_threads:
|
||||
thread.join(timeout=10)
|
||||
if thread.is_alive():
|
||||
log(f"Warning: {thread.name} did not stop gracefully", 'WARNING')
|
||||
|
||||
self.smtp_pool.close_all()
|
||||
log("👋 Worker stopped")
|
||||
|
||||
def set_metrics(self, metrics: MetricsCollector):
|
||||
"""Set metrics collector"""
|
||||
self.metrics = metrics
|
||||
self.processor.metrics = metrics
|
||||
|
||||
def print_startup_banner(self):
|
||||
"""Print startup information"""
|
||||
log(f"\n{'='*70}")
|
||||
log(f"🚀 UNIFIED EMAIL WORKER")
|
||||
log(f"{'='*70}")
|
||||
log(f" Domains: {len(self.queue_urls)}")
|
||||
log(f" DynamoDB: {'Connected' if self.dynamodb.available else 'Not Available'}")
|
||||
|
||||
if config.lmtp_enabled:
|
||||
log(f" Delivery: LMTP -> {config.lmtp_host}:{config.lmtp_port} (bypasses transport_maps)")
|
||||
else:
|
||||
log(f" Delivery: SMTP -> {config.smtp_host}:{config.smtp_port}")
|
||||
|
||||
log(f" Poll Interval: {config.poll_interval}s")
|
||||
log(f" Visibility: {config.visibility_timeout}s")
|
||||
log(f"")
|
||||
log(f" Features:")
|
||||
log(f" ✓ Bounce Detection & Header Rewriting")
|
||||
log(f" {'✓' if self.dynamodb.available else '✗'} Auto-Reply / Out-of-Office")
|
||||
log(f" {'✓' if self.dynamodb.available else '✗'} Email Forwarding")
|
||||
log(f" {'✓' if self.dynamodb.available else '✗'} Blocked Senders (Wildcard)")
|
||||
log(f" {'✓' if self.metrics else '✗'} Prometheus Metrics")
|
||||
log(f" {'✓' if config.lmtp_enabled else '✗'} LMTP Direct Delivery")
|
||||
log(f"")
|
||||
log(f" Active Domains:")
|
||||
for domain in sorted(self.queue_urls.keys()):
|
||||
log(f" • {domain}")
|
||||
log(f"{'='*70}\n")
|
||||
Reference in New Issue
Block a user