Compare commits

...

2 Commits

Author SHA1 Message Date
9862689c0c no markAsBlocked, 2026-04-12 20:43:37 -05:00
bed6c2a398 fix 2026-04-03 17:03:14 -05:00
2 changed files with 35 additions and 29 deletions

View File

@@ -53,7 +53,6 @@ def get_queue_url(domain):
raise raise
def publish_to_sns(topic_arn, message_body, msg_id): def publish_to_sns(topic_arn, message_body, msg_id):
"""Versucht SNS Publish. Gibt True bei Erfolg, False bei Topic-Not-Found."""
attempt = 0 attempt = 0
while attempt < MAX_RETRIES: while attempt < MAX_RETRIES:
try: try:
@@ -65,8 +64,9 @@ def publish_to_sns(topic_arn, message_body, msg_id):
return True return True
except ClientError as e: except ClientError as e:
error_code = e.response['Error']['Code'] error_code = e.response['Error']['Code']
if error_code == 'NotFound' or error_code == 'NotFoundException': # Fallback auf SQS bei Topic-nicht-gefunden ODER fehlender Berechtigung
logger.info(f" SNS Topic not found for {topic_arn} — falling back to SQS") if error_code in ('NotFound', 'NotFoundException', 'AuthorizationError'):
logger.info(f" SNS unavailable for {topic_arn} ({error_code}) — falling back to SQS")
return False return False
attempt += 1 attempt += 1
logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}") logger.warning(f"Retry {attempt}/{MAX_RETRIES} SNS: {error_code}")

View File

@@ -26,7 +26,7 @@ import { BlocklistChecker } from '../email/blocklist.js';
import { BounceHandler } from '../email/bounce-handler.js'; import { BounceHandler } from '../email/bounce-handler.js';
import { parseEmail, isProcessedByWorker } from '../email/parser.js'; import { parseEmail, isProcessedByWorker } from '../email/parser.js';
import { RulesProcessor } from '../email/rules-processor.js'; import { RulesProcessor } from '../email/rules-processor.js';
import { config } from '../config.js';
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Processor // Processor
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -258,34 +258,40 @@ export class MessageProcessor {
if (totalHandled === recipients.length) { if (totalHandled === recipients.length) {
if (blockedRecipients.length === recipients.length) { if (blockedRecipients.length === recipients.length) {
// All blocked // All blocked — im Standby kein S3 anfassen
try { if (!config.standbyMode) {
await this.s3.markAsBlocked( try {
domain, await this.s3.markAsBlocked(
messageId, domain,
blockedRecipients, messageId,
fromAddrFinal, blockedRecipients,
workerName, fromAddrFinal,
); workerName,
await this.s3.deleteBlockedEmail(domain, messageId, workerName); );
} catch (err: any) { await this.s3.deleteBlockedEmail(domain, messageId, workerName);
log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName); } catch (err: any) {
return false; log(`⚠ Failed to handle blocked email: ${err.message ?? err}`, 'ERROR', workerName);
return false;
}
} }
} else if (successful.length > 0) { } else if (successful.length > 0) {
await this.s3.markAsProcessed( if (!config.standbyMode) {
domain, await this.s3.markAsProcessed(
messageId, domain,
workerName, messageId,
failedPermanent.length > 0 ? failedPermanent : undefined, workerName,
); failedPermanent.length > 0 ? failedPermanent : undefined,
);
}
} else if (failedPermanent.length > 0) { } else if (failedPermanent.length > 0) {
await this.s3.markAsAllInvalid( if (!config.standbyMode) {
domain, await this.s3.markAsAllInvalid(
messageId, domain,
failedPermanent, messageId,
workerName, failedPermanent,
); workerName,
);
}
} }
// Summary // Summary