This commit is contained in:
2025-09-17 19:32:40 -05:00
parent 6d12e7e151
commit 2cac2af5ab
9 changed files with 395 additions and 135 deletions

View File

@@ -2,93 +2,288 @@ import { db } from '@/app/db/drizzle';
import { domains, emails } from '@/app/db/schema';
import { getS3Client, getBody } from '@/app/lib/utils';
import { simpleParser } from 'mailparser';
import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
import { eq, sql } from 'drizzle-orm';
import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { eq, sql, inArray } from 'drizzle-orm';
import { Readable } from 'stream';
import pRetry from 'p-retry'; // Für Retry bei Timeouts
import pLimit from 'p-limit'; // Für parallele Verarbeitung mit Limit
import pRetry from 'p-retry';
// Konfigurierbare Konstanten
const CONCURRENT_S3_OPERATIONS = 10; // Parallele S3 Operationen
const BATCH_INSERT_SIZE = 100; // Batch-Größe für DB Inserts
const CONCURRENT_EMAIL_PARSING = 5; // Parallele E-Mail Parser
export async function syncAllDomains() {
console.log('Starting syncAllDomains...');
console.log('Starting optimized syncAllDomains...');
const startTime = Date.now();
const s3 = getS3Client();
const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 }); // Retry bei Fail
const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 });
const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || [];
console.log('Found domain buckets:', domainBuckets.map(b => b.Name).join(', ') || 'None');
console.log(`Found ${domainBuckets.length} domain buckets`);
for (const bucketObj of domainBuckets) {
const bucket = bucketObj.Name!;
const domainName = bucket.replace('-emails', '').replace(/-/g, '.');
console.log(`Processing bucket: ${bucket} (domain: ${domainName})`);
// Parallele Verarbeitung der Buckets
const bucketLimit = pLimit(3); // Max 3 Buckets parallel
await Promise.all(
domainBuckets.map(bucketObj =>
bucketLimit(async () => {
const bucket = bucketObj.Name!;
const domainName = bucket.replace('-emails', '').replace(/-/g, '.');
console.log(`Processing bucket: ${bucket}`);
// Upsert Domain
const existingDomain = await db.select().from(domains).where(eq(domains.bucket, bucket)).limit(1);
console.log(`Existing domain for ${bucket}: ${existingDomain.length > 0 ? 'Yes (ID: ' + existingDomain[0].id + ')' : 'No'}`);
let domainId;
if (existingDomain.length === 0) {
const [newDomain] = await db.insert(domains).values({ bucket, domain: domainName }).returning({ id: domains.id });
domainId = newDomain.id;
console.log(`Created new domain ID: ${domainId}`);
} else {
domainId = existingDomain[0].id;
}
// Upsert Domain
const [domain] = await db
.insert(domains)
.values({ bucket, domain: domainName })
.onConflictDoUpdate({
target: domains.bucket,
set: { domain: domainName }
})
.returning({ id: domains.id });
// Sync Emails
await syncEmailsForDomain(domainId, bucket);
}
console.log('syncAllDomains completed.');
await syncEmailsForDomainOptimized(domain.id, bucket, s3);
})
)
);
const duration = (Date.now() - startTime) / 1000;
console.log(`syncAllDomains completed in ${duration}s`);
}
async function syncEmailsForDomain(domainId: number, bucket: string) {
console.log(`Starting syncEmailsForDomain for bucket: ${bucket} (domainId: ${domainId})`);
const s3 = getS3Client();
const { Contents } = await pRetry(() => s3.send(new ListObjectsV2Command({ Bucket: bucket })), { retries: 3 });
console.log(`Found objects in bucket: ${Contents?.length || 0}`);
for (const obj of Contents || []) {
if (!obj.Key) continue;
console.log(`Processing object: ${obj.Key}`);
// Check if exists
const existing = await db.select().from(emails).where(eq(emails.s3Key, obj.Key!)).limit(1);
console.log(`Existing email for ${obj.Key}: ${existing.length > 0 ? 'Yes' : 'No'}`);
if (existing.length > 0) {
// Update processed if changed
const head = await pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
const processed = head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
console.log(`Processed metadata for ${obj.Key}: ${processed} (DB: ${existing[0].processed})`);
if (existing[0].processed !== processed) {
await db.update(emails).set({ processed }).where(eq(emails.s3Key, obj.Key!));
console.log(`Updated processed for ${obj.Key}`);
}
await new Promise(resolve => setTimeout(resolve, 100)); // Kleine Delay gegen Throttling
continue;
async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3: S3Client) {
console.log(`Starting optimized sync for bucket: ${bucket}`);
const startTime = Date.now();
// 1. Hole alle S3 Keys auf einmal
let allS3Keys: string[] = [];
let continuationToken: string | undefined;
do {
const response = await pRetry(
() => s3.send(new ListObjectsV2Command({
Bucket: bucket,
MaxKeys: 1000, // Maximum per Request
ContinuationToken: continuationToken
})),
{ retries: 3 }
);
allS3Keys.push(...(response.Contents?.map(obj => obj.Key!).filter(Boolean) || []));
continuationToken = response.NextContinuationToken;
} while (continuationToken);
console.log(`Found ${allS3Keys.length} objects in bucket`);
if (allS3Keys.length === 0) return;
// 2. Hole alle existierenden E-Mails aus der DB in einem Query
const existingEmails = await db
.select({
s3Key: emails.s3Key,
processed: emails.processed
})
.from(emails)
.where(inArray(emails.s3Key, allS3Keys));
const existingKeysMap = new Map(
existingEmails.map(e => [e.s3Key, e.processed])
);
console.log(`Found ${existingEmails.length} existing emails in DB`);
// 3. Bestimme was zu tun ist
const toInsert: string[] = [];
const toCheckProcessed: string[] = [];
for (const key of allS3Keys) {
if (!existingKeysMap.has(key)) {
toInsert.push(key);
} else {
toCheckProcessed.push(key);
}
// New: Parse and insert
console.log(`Parsing new email: ${obj.Key}`);
const { Body } = await pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
const raw = await getBody(Body as Readable);
const parsed = await simpleParser(raw);
const head = await pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
const to = parsed.to ? (Array.isArray(parsed.to) ? parsed.to.flatMap(t => t.value.map(v => v.address?.toLowerCase() || '')) : parsed.to.value.map(v => v.address?.toLowerCase() || '')) : [];
const cc = parsed.cc ? (Array.isArray(parsed.cc) ? parsed.cc.flatMap(c => c.value.map(v => v.address?.toLowerCase() || '')) : parsed.cc.value.map(v => v.address?.toLowerCase() || '')) : [];
const bcc = parsed.bcc ? (Array.isArray(parsed.bcc) ? parsed.bcc.flatMap(b => b.value.map(v => v.address?.toLowerCase() || '')) : parsed.bcc.value.map(v => v.address?.toLowerCase() || '')) : [];
await db.insert(emails).values({
domainId,
s3Key: obj.Key!,
from: parsed.from?.value[0].address,
to,
cc,
bcc,
subject: parsed.subject,
html: parsed.html || parsed.textAsHtml,
raw: raw.toString('utf-8'),
processed: head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
date: parsed.date || obj.LastModified,
});
console.log(`Inserted new email: ${obj.Key}`);
await new Promise(resolve => setTimeout(resolve, 100)); // Delay gegen Throttling
}
console.log(`syncEmailsForDomain completed for bucket: ${bucket}`);
console.log(`To insert: ${toInsert.length}, To check: ${toCheckProcessed.length}`);
// 4. Parallele Verarbeitung der Updates (Processed Status)
if (toCheckProcessed.length > 0) {
const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS);
const updatePromises = toCheckProcessed.map(key =>
updateLimit(async () => {
try {
const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
const processed = head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
const currentProcessed = existingKeysMap.get(key);
if (currentProcessed !== processed) {
await db
.update(emails)
.set({ processed })
.where(eq(emails.s3Key, key));
console.log(`Updated processed status for ${key}`);
}
} catch (error) {
console.error(`Error checking ${key}:`, error);
}
})
);
await Promise.all(updatePromises);
}
// 5. Batch-Insert für neue E-Mails
if (toInsert.length > 0) {
console.log(`Processing ${toInsert.length} new emails...`);
// Verarbeite in Batches
for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) {
const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE);
console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`);
// Paralleles Fetching und Parsing
const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING);
const emailDataPromises = batch.map(key =>
parseLimit(async () => {
try {
// Hole Objekt und Metadata parallel
const [getObjResponse, headResponse] = await Promise.all([
pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }),
pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 })
]);
const raw = await getBody(getObjResponse.Body as Readable);
const parsed = await simpleParser(raw, {
skipHtmlToText: true, // Schneller, wenn Text nicht benötigt
skipTextContent: false,
skipImageLinks: true
});
const to = extractAddresses(parsed.to);
const cc = extractAddresses(parsed.cc);
const bcc = extractAddresses(parsed.bcc);
return {
domainId,
s3Key: key,
from: parsed.from?.value[0]?.address,
to,
cc,
bcc,
subject: parsed.subject,
html: parsed.html || parsed.textAsHtml,
raw: raw.toString('utf-8'),
processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
date: parsed.date || headResponse.LastModified,
};
} catch (error) {
console.error(`Error processing ${key}:`, error);
return null;
}
})
);
const emailData = (await Promise.all(emailDataPromises)).filter(Boolean);
// Batch Insert
if (emailData.length > 0) {
await db.insert(emails).values(emailData);
console.log(`Inserted ${emailData.length} emails`);
}
}
}
const duration = (Date.now() - startTime) / 1000;
console.log(`Sync for ${bucket} completed in ${duration}s`);
}
// Helper Funktion für Address-Extraktion
function extractAddresses(addressObj: any): string[] {
if (!addressObj) return [];
if (Array.isArray(addressObj)) {
return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean);
}
return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || [];
}
// Optimierte Version mit Stream-Processing für sehr große Buckets
export async function syncEmailsForDomainStreaming(domainId: number, bucket: string, s3: S3Client) {
console.log(`Starting streaming sync for bucket: ${bucket}`);
// Verwende S3 Select für große Datensätze (falls unterstützt)
// Dies reduziert die übertragene Datenmenge erheblich
const existingKeys = new Set(
(await db
.select({ s3Key: emails.s3Key })
.from(emails)
.where(eq(emails.domainId, domainId))
).map(e => e.s3Key)
);
const processQueue: any[] = [];
const QUEUE_SIZE = 50;
// Stream-basierte Verarbeitung
let continuationToken: string | undefined;
do {
const response = await s3.send(new ListObjectsV2Command({
Bucket: bucket,
MaxKeys: 100,
ContinuationToken: continuationToken
}));
const newKeys = response.Contents?.filter(obj =>
obj.Key && !existingKeys.has(obj.Key)
) || [];
// Verarbeite parallel während wir weitere Keys holen
if (newKeys.length > 0) {
const batch = newKeys.slice(0, QUEUE_SIZE);
// Prozessiere Batch async (ohne await)
processBatchAsync(batch, bucket, domainId, s3);
}
continuationToken = response.NextContinuationToken;
} while (continuationToken);
}
async function processBatchAsync(batch: any[], bucket: string, domainId: number, s3: S3Client) {
// Async Batch Processing ohne den Hauptthread zu blockieren
const emailData = await Promise.all(
batch.map(async (obj) => {
try {
const [getObjResponse, headResponse] = await Promise.all([
s3.send(new GetObjectCommand({ Bucket: bucket, Key: obj.Key })),
s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key }))
]);
const raw = await getBody(getObjResponse.Body as Readable);
const parsed = await simpleParser(raw);
return {
domainId,
s3Key: obj.Key,
from: parsed.from?.value[0]?.address,
to: extractAddresses(parsed.to),
cc: extractAddresses(parsed.cc),
bcc: extractAddresses(parsed.bcc),
subject: parsed.subject,
html: parsed.html || parsed.textAsHtml,
raw: raw.toString('utf-8'),
processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
date: parsed.date || obj.LastModified,
};
} catch (error) {
console.error(`Error processing ${obj.Key}:`, error);
return null;
}
})
);
const validData = emailData.filter(Boolean);
if (validData.length > 0) {
await db.insert(emails).values(validData);
}
}