Compare commits

...

2 Commits

Author SHA1 Message Date
9862689c0c no markAsBlocked, 2026-04-12 20:43:37 -05:00
bed6c2a398 fix 2026-04-03 17:03:14 -05:00
2 changed files with 35 additions and 29 deletions

View File

@@ -53,7 +53,6 @@ def get_queue_url(domain):
raise
def publish_to_sns(topic_arn, message_body, msg_id):
"""Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found."""
attempt = 0
while attempt < MAX_RETRIES:
try:
@@ -65,8 +64,9 @@ def publish_to_sns(topic_arn, message_body, msg_id):
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NotFound' or error_code == 'NotFoundException':
logger.info(f" SNS Topic not found for {topic_arn} — falling back to SQS")
# Fallback auf SQS bei Topic-nicht-gefunden ODER fehlender Berechtigung
if error_code in ('NotFound', 'NotFoundException', 'AuthorizationError'):
logger.info(f" SNS unavailable for {topic_arn} ({error_code}) — falling back to SQS")
return False
attempt += 1
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")

View File

@@ -26,7 +26,7 @@ import { BlocklistChecker } from '../email/blocklist.js';
import { BounceHandler } from '../email/bounce-handler.js';
import { parseEmail, isProcessedByWorker } from '../email/parser.js';
import { RulesProcessor } from '../email/rules-processor.js';
import { config } from '../config.js';
// ---------------------------------------------------------------------------
// Processor
// ---------------------------------------------------------------------------
@@ -258,34 +258,40 @@ export class MessageProcessor {
if (totalHandled === recipients.length) {
if (blockedRecipients.length === recipients.length) {
// All blocked
try {
await this.s3.markAsBlocked(
domain,
messageId,
blockedRecipients,
fromAddrFinal,
workerName,
);
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
} catch (err: any) {
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
return false;
// All blocked — im Standby kein S3 anfassen
if (!config.standbyMode) {
try {
await this.s3.markAsBlocked(
domain,
messageId,
blockedRecipients,
fromAddrFinal,
workerName,
);
await this.s3.deleteBlockedEmail(domain, messageId, workerName);
} catch (err: any) {
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
return false;
}
}
} else if (successful.length > 0) {
await this.s3.markAsProcessed(
domain,
messageId,
workerName,
failedPermanent.length > 0 ? failedPermanent : undefined,
);
if (!config.standbyMode) {
await this.s3.markAsProcessed(
domain,
messageId,
workerName,
failedPermanent.length > 0 ? failedPermanent : undefined,
);
}
} else if (failedPermanent.length > 0) {
await this.s3.markAsAllInvalid(
domain,
messageId,
failedPermanent,
workerName,
);
if (!config.standbyMode) {
await this.s3.markAsAllInvalid(
domain,
messageId,
failedPermanent,
workerName,
);
}
}
// Summary