Compare commits

...

15 Commits

Author SHA1 Message Date
27c2be664a standby mode, sns or sqs 2026-04-03 16:54:51 -05:00
7aed24bfff Merge branch 'contabo' 2026-04-03 16:16:01 -05:00
d331bd13b5 no buffer 2026-03-11 19:38:02 -05:00
610b01eee7 whitelist helper 2026-03-11 19:26:32 -05:00
c2d4903bc9 ENABLE_FAIL2BAN 0 2026-03-11 09:38:00 -05:00
613aa30493 logs 2026-03-08 16:15:41 -05:00
29f360ece8 logger console + file 2026-03-08 16:09:30 -05:00
62221e8121 fix 2026-03-08 14:54:33 -05:00
74c4f5801e Prometheus, Grafana, blackbox_exporter 2026-03-08 14:50:43 -05:00
90b120957d add missing import 2026-03-07 17:07:50 -06:00
99ab2a07d8 send mail even if if parsing fails 2026-03-07 17:06:03 -06:00
d9a91c13ed printstats 2026-03-07 16:41:51 -06:00
1d53f2d357 pino 2026-03-07 15:34:15 -06:00
9586869c0c neue Ports 2026-03-07 15:26:56 -06:00
d1426afec5 new structure 2026-03-07 15:16:14 -06:00
3 changed files with 99 additions and 52 deletions

View File

@@ -8,24 +8,38 @@ from botocore.exceptions import ClientError
import time import time
import random import random
# Logging konfigurieren
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
sqs = boto3.client('sqs') sqs = boto3.client('sqs')
sns = boto3.client('sns')
sts_account_id = None
# Retry-Konfiguration
MAX_RETRIES = 3 MAX_RETRIES = 3
BASE_BACKOFF = 1 # Sekunden BASE_BACKOFF = 1
def exponential_backoff(attempt): def exponential_backoff(attempt):
"""Exponential Backoff mit Jitter"""
return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1) return BASE_BACKOFF * (2 ** attempt) + random.uniform(0, 1)
def get_account_id():
global sts_account_id
if sts_account_id is None:
sts_account_id = boto3.client('sts').get_caller_identity()['Account']
return sts_account_id
def get_topic_arn(domain):
"""
Generiert Topic-ARN aus Domain.
Konvention: domain.tld -> domain-tld-topic
"""
topic_name = domain.replace('.', '-') + '-topic'
region = os.environ.get('AWS_REGION', 'us-east-2')
account_id = get_account_id()
return f"arn:aws:sns:{region}:{account_id}:{topic_name}"
def get_queue_url(domain): def get_queue_url(domain):
""" """
Generiert Queue-Namen aus Domain und holt URL. Fallback: Direkter SQS-Send für Domains ohne SNS-Topic.
Konvention: domain.tld -> domain-tld-queue
""" """
queue_name = domain.replace('.', '-') + '-queue' queue_name = domain.replace('.', '-') + '-queue'
try: try:
@@ -38,11 +52,53 @@ def get_queue_url(domain):
else: else:
raise raise
def publish_to_sns(topic_arn, message_body, msg_id):
"""Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found."""
attempt = 0
while attempt < MAX_RETRIES:
try:
sns.publish(
TopicArn=topic_arn,
Message=message_body
)
logger.info(f"✅ Published {msg_id} to SNS: {topic_arn}")
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NotFound' or error_code == 'NotFoundException':
logger.info(f" SNS Topic not found for {topic_arn} — falling back to SQS")
return False
attempt += 1
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
return False
def send_to_sqs(queue_url, message_body, msg_id):
"""Fallback: Direkter SQS-Send (wie bisher)."""
attempt = 0
while attempt < MAX_RETRIES:
try:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body
)
logger.info(f"✅ Sent {msg_id} to SQS: {queue_url}")
return
except ClientError as e:
attempt += 1
error_code = e.response['Error']['Code']
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SQS: {error_code}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
def lambda_handler(event, context): def lambda_handler(event, context):
""" """
Nimmt SES Event entgegen, extrahiert Domain dynamisch, Nimmt SES Event entgegen, extrahiert Domain dynamisch.
verpackt Metadaten als 'Fake SNS' und sendet an die domain-spezifische SQS. Strategie: SNS Publish (Fan-Out an Primary + Standby Queue).
Mit integrierter Retry-Logik für SQS-Send. Fallback: Direkter SQS-Send falls kein SNS-Topic existiert.
""" """
try: try:
records = event.get('Records', []) records = event.get('Records', [])
@@ -51,13 +107,12 @@ def lambda_handler(event, context):
for record in records: for record in records:
ses_data = record.get('ses', {}) ses_data = record.get('ses', {})
if not ses_data: if not ses_data:
logger.warning(f"Invalid SES event: Missing 'ses' in record: {record}") logger.warning(f"Invalid SES event: Missing 'ses' in record")
continue continue
mail = ses_data.get('mail', {}) mail = ses_data.get('mail', {})
receipt = ses_data.get('receipt', {}) receipt = ses_data.get('receipt', {})
# Domain extrahieren (aus erstem Recipient)
recipients = receipt.get('recipients', []) or mail.get('destination', []) recipients = receipt.get('recipients', []) or mail.get('destination', [])
if not recipients: if not recipients:
logger.warning("No recipients in event - skipping") logger.warning("No recipients in event - skipping")
@@ -69,23 +124,19 @@ def lambda_handler(event, context):
logger.error("Could not extract domain from recipient") logger.error("Could not extract domain from recipient")
continue continue
# Wichtige Metadaten loggen
msg_id = mail.get('messageId', 'unknown') msg_id = mail.get('messageId', 'unknown')
source = mail.get('source', 'unknown') source = mail.get('source', 'unknown')
logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}") logger.info(f"Processing Message-ID: {msg_id} for domain: {domain}")
logger.info(f" From: {source}") logger.info(f" From: {source}")
logger.info(f" To: {recipients}") logger.info(f" To: {recipients}")
# SES JSON als String serialisieren
ses_json_string = json.dumps(ses_data) ses_json_string = json.dumps(ses_data)
# Payload Größe loggen und checken (Safeguard)
payload_size = len(ses_json_string.encode('utf-8')) payload_size = len(ses_json_string.encode('utf-8'))
logger.info(f" Metadata Payload Size: {payload_size} bytes") logger.info(f" Metadata Payload Size: {payload_size} bytes")
if payload_size > 200000: # Arbitrary Limit < SQS 256KB if payload_size > 200000:
raise ValueError("Payload too large for SQS") raise ValueError("Payload too large")
# Fake SNS Payload
fake_sns_payload = { fake_sns_payload = {
"Type": "Notification", "Type": "Notification",
"MessageId": str(uuid.uuid4()), "MessageId": str(uuid.uuid4()),
@@ -95,26 +146,16 @@ def lambda_handler(event, context):
"Timestamp": datetime.utcnow().isoformat() + "Z" "Timestamp": datetime.utcnow().isoformat() + "Z"
} }
# Queue URL dynamisch holen message_body = json.dumps(fake_sns_payload)
queue_url = get_queue_url(domain)
# SQS Send mit Retries # Strategie: SNS zuerst, SQS als Fallback
attempt = 0 topic_arn = get_topic_arn(domain)
while attempt < MAX_RETRIES: sns_success = publish_to_sns(topic_arn, message_body, msg_id)
try:
sqs.send_message( if not sns_success:
QueueUrl=queue_url, # Kein SNS-Topic für diese Domain → direkt in SQS (wie bisher)
MessageBody=json.dumps(fake_sns_payload) queue_url = get_queue_url(domain)
) send_to_sqs(queue_url, message_body, msg_id)
logger.info(f"✅ Successfully forwarded {msg_id} to SQS: {queue_url}")
break
except ClientError as e:
attempt += 1
error_code = e.response['Error']['Code']
logger.warning(f"Retry {attempt}/{MAX_RETRIES} for SQS send: {error_code} - {str(e)}")
if attempt == MAX_RETRIES:
raise
time.sleep(exponential_backoff(attempt))
return {'status': 'ok'} return {'status': 'ok'}

View File

@@ -48,6 +48,9 @@ export const config = {
// Monitoring // Monitoring
metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10), metricsPort: parseInt(process.env.METRICS_PORT ?? '8000', 10),
healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10), healthPort: parseInt(process.env.HEALTH_PORT ?? '8080', 10),
queueSuffix: process.env.QUEUE_SUFFIX ?? '-queue',
standbyMode: (process.env.STANDBY_MODE ?? 'false').toLowerCase() === 'true',
} as const; } as const;
export type Config = typeof config; export type Config = typeof config;
@@ -106,7 +109,7 @@ export function isInternalAddress(email: string): boolean {
/** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */ /** Convert domain to SQS queue name: bizmatch.net → bizmatch-net-queue */
export function domainToQueueName(domain: string): string { export function domainToQueueName(domain: string): string {
return domain.replace(/\./g, '-') + '-queue'; return domain.replace(/\./g, '-') + config.queueSuffix;
} }
/** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */ /** Convert domain to S3 bucket name: bizmatch.net → bizmatch-net-emails */

View File

@@ -36,6 +36,9 @@ export class RulesProcessor {
workerName: string, workerName: string,
metricsCallback?: MetricsCallback, metricsCallback?: MetricsCallback,
): Promise<boolean> { ): Promise<boolean> {
if (config.standbyMode) {
return false;
}
const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase()); const rule = await this.dynamodb.getEmailRules(recipient.toLowerCase());
if (!rule) return false; if (!rule) return false;