changes
This commit is contained in:
151
app/lib/sync.ts
151
app/lib/sync.ts
@@ -5,13 +5,13 @@ import { simpleParser } from 'mailparser';
|
||||
import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||
import { eq, sql, inArray } from 'drizzle-orm';
|
||||
import { Readable } from 'stream';
|
||||
import pLimit from 'p-limit'; // Für parallele Verarbeitung mit Limit
|
||||
import pLimit from 'p-limit';
|
||||
import pRetry from 'p-retry';
|
||||
|
||||
// Konfigurierbare Konstanten
|
||||
const CONCURRENT_S3_OPERATIONS = 10; // Parallele S3 Operationen
|
||||
const BATCH_INSERT_SIZE = 100; // Batch-Größe für DB Inserts
|
||||
const CONCURRENT_EMAIL_PARSING = 5; // Parallele E-Mail Parser
|
||||
const CONCURRENT_S3_OPERATIONS = 10;
|
||||
const BATCH_INSERT_SIZE = 100;
|
||||
const CONCURRENT_EMAIL_PARSING = 5;
|
||||
|
||||
export async function syncAllDomains() {
|
||||
console.log('Starting optimized syncAllDomains...');
|
||||
@@ -22,8 +22,7 @@ export async function syncAllDomains() {
|
||||
const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || [];
|
||||
console.log(`Found ${domainBuckets.length} domain buckets`);
|
||||
|
||||
// Parallele Verarbeitung der Buckets
|
||||
const bucketLimit = pLimit(3); // Max 3 Buckets parallel
|
||||
const bucketLimit = pLimit(3);
|
||||
|
||||
await Promise.all(
|
||||
domainBuckets.map(bucketObj =>
|
||||
@@ -32,7 +31,6 @@ export async function syncAllDomains() {
|
||||
const domainName = bucket.replace('-emails', '').replace(/-/g, '.');
|
||||
console.log(`Processing bucket: ${bucket}`);
|
||||
|
||||
// Upsert Domain
|
||||
const [domain] = await db
|
||||
.insert(domains)
|
||||
.values({ bucket, domain: domainName })
|
||||
@@ -63,7 +61,7 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
const response = await pRetry(
|
||||
() => s3.send(new ListObjectsV2Command({
|
||||
Bucket: bucket,
|
||||
MaxKeys: 1000, // Maximum per Request
|
||||
MaxKeys: 1000,
|
||||
ContinuationToken: continuationToken
|
||||
})),
|
||||
{ retries: 3 }
|
||||
@@ -94,37 +92,45 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
|
||||
// 3. Bestimme was zu tun ist
|
||||
const toInsert: string[] = [];
|
||||
const toCheckProcessed: string[] = [];
|
||||
const toUpdate: string[] = [];
|
||||
|
||||
for (const key of allS3Keys) {
|
||||
if (!existingKeysMap.has(key)) {
|
||||
toInsert.push(key);
|
||||
} else {
|
||||
toCheckProcessed.push(key);
|
||||
toUpdate.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`To insert: ${toInsert.length}, To check: ${toCheckProcessed.length}`);
|
||||
console.log(`To insert: ${toInsert.length}, To update: ${toUpdate.length}`);
|
||||
|
||||
// 4. Parallele Verarbeitung der Updates (Processed Status)
|
||||
if (toCheckProcessed.length > 0) {
|
||||
// 4. Parallele Verarbeitung der Updates (Metadaten)
|
||||
if (toUpdate.length > 0) {
|
||||
const updateLimit = pLimit(CONCURRENT_S3_OPERATIONS);
|
||||
const updatePromises = toCheckProcessed.map(key =>
|
||||
const updatePromises = toUpdate.map(key =>
|
||||
updateLimit(async () => {
|
||||
try {
|
||||
const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
|
||||
const processed = head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
|
||||
const currentProcessed = existingKeysMap.get(key);
|
||||
const metadata = head.Metadata || {};
|
||||
|
||||
if (currentProcessed !== processed) {
|
||||
await db
|
||||
.update(emails)
|
||||
.set({ processed })
|
||||
.where(eq(emails.s3Key, key));
|
||||
console.log(`Updated processed status for ${key}`);
|
||||
}
|
||||
const processed = metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
|
||||
const processedAt = metadata['processed_at'] ? new Date(metadata['processed_at']) : null;
|
||||
const processedBy = metadata['processed_by'] || null;
|
||||
const queuedTo = metadata['queued_to'] || null;
|
||||
const status = metadata['status'] || null;
|
||||
|
||||
await db
|
||||
.update(emails)
|
||||
.set({
|
||||
processed,
|
||||
processedAt,
|
||||
processedBy,
|
||||
queuedTo,
|
||||
status
|
||||
})
|
||||
.where(eq(emails.s3Key, key));
|
||||
} catch (error) {
|
||||
console.error(`Error checking ${key}:`, error);
|
||||
console.error(`Error updating ${key}:`, error);
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -136,17 +142,14 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
if (toInsert.length > 0) {
|
||||
console.log(`Processing ${toInsert.length} new emails...`);
|
||||
|
||||
// Verarbeite in Batches
|
||||
for (let i = 0; i < toInsert.length; i += BATCH_INSERT_SIZE) {
|
||||
const batch = toInsert.slice(i, i + BATCH_INSERT_SIZE);
|
||||
console.log(`Processing batch ${Math.floor(i/BATCH_INSERT_SIZE) + 1}/${Math.ceil(toInsert.length/BATCH_INSERT_SIZE)}`);
|
||||
|
||||
// Paralleles Fetching und Parsing
|
||||
const parseLimit = pLimit(CONCURRENT_EMAIL_PARSING);
|
||||
const emailDataPromises = batch.map(key =>
|
||||
parseLimit(async () => {
|
||||
try {
|
||||
// Hole Objekt und Metadata parallel
|
||||
const [getObjResponse, headResponse] = await Promise.all([
|
||||
pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 }),
|
||||
pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), { retries: 2 })
|
||||
@@ -154,11 +157,12 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
|
||||
const raw = await getBody(getObjResponse.Body as Readable);
|
||||
const parsed = await simpleParser(raw, {
|
||||
skipHtmlToText: true, // Schneller, wenn Text nicht benötigt
|
||||
skipHtmlToText: true,
|
||||
skipTextContent: false,
|
||||
skipImageLinks: true
|
||||
});
|
||||
|
||||
const metadata = headResponse.Metadata || {};
|
||||
const to = extractAddresses(parsed.to);
|
||||
const cc = extractAddresses(parsed.cc);
|
||||
const bcc = extractAddresses(parsed.bcc);
|
||||
@@ -173,8 +177,13 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
subject: parsed.subject,
|
||||
html: parsed.html || parsed.textAsHtml,
|
||||
raw: raw.toString('utf-8'),
|
||||
processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
|
||||
processed: metadata[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
|
||||
date: parsed.date || headResponse.LastModified,
|
||||
// Neue Metadaten
|
||||
processedAt: metadata['processed_at'] ? new Date(metadata['processed_at']) : null,
|
||||
processedBy: metadata['processed_by'] || null,
|
||||
queuedTo: metadata['queued_to'] || null,
|
||||
status: metadata['status'] || null,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(`Error processing ${key}:`, error);
|
||||
@@ -185,7 +194,6 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
|
||||
const emailData = (await Promise.all(emailDataPromises)).filter(Boolean);
|
||||
|
||||
// Batch Insert
|
||||
if (emailData.length > 0) {
|
||||
await db.insert(emails).values(emailData);
|
||||
console.log(`Inserted ${emailData.length} emails`);
|
||||
@@ -197,93 +205,10 @@ async function syncEmailsForDomainOptimized(domainId: number, bucket: string, s3
|
||||
console.log(`Sync for ${bucket} completed in ${duration}s`);
|
||||
}
|
||||
|
||||
// Helper Funktion für Address-Extraktion
|
||||
function extractAddresses(addressObj: any): string[] {
|
||||
if (!addressObj) return [];
|
||||
if (Array.isArray(addressObj)) {
|
||||
return addressObj.flatMap(t => t.value.map((v: any) => v.address?.toLowerCase() || '')).filter(Boolean);
|
||||
}
|
||||
return addressObj.value?.map((v: any) => v.address?.toLowerCase() || '').filter(Boolean) || [];
|
||||
}
|
||||
|
||||
// Optimierte Version mit Stream-Processing für sehr große Buckets
|
||||
export async function syncEmailsForDomainStreaming(domainId: number, bucket: string, s3: S3Client) {
|
||||
console.log(`Starting streaming sync for bucket: ${bucket}`);
|
||||
|
||||
// Verwende S3 Select für große Datensätze (falls unterstützt)
|
||||
// Dies reduziert die übertragene Datenmenge erheblich
|
||||
|
||||
const existingKeys = new Set(
|
||||
(await db
|
||||
.select({ s3Key: emails.s3Key })
|
||||
.from(emails)
|
||||
.where(eq(emails.domainId, domainId))
|
||||
).map(e => e.s3Key)
|
||||
);
|
||||
|
||||
const processQueue: any[] = [];
|
||||
const QUEUE_SIZE = 50;
|
||||
|
||||
// Stream-basierte Verarbeitung
|
||||
let continuationToken: string | undefined;
|
||||
|
||||
do {
|
||||
const response = await s3.send(new ListObjectsV2Command({
|
||||
Bucket: bucket,
|
||||
MaxKeys: 100,
|
||||
ContinuationToken: continuationToken
|
||||
}));
|
||||
|
||||
const newKeys = response.Contents?.filter(obj =>
|
||||
obj.Key && !existingKeys.has(obj.Key)
|
||||
) || [];
|
||||
|
||||
// Verarbeite parallel während wir weitere Keys holen
|
||||
if (newKeys.length > 0) {
|
||||
const batch = newKeys.slice(0, QUEUE_SIZE);
|
||||
// Prozessiere Batch async (ohne await)
|
||||
processBatchAsync(batch, bucket, domainId, s3);
|
||||
}
|
||||
|
||||
continuationToken = response.NextContinuationToken;
|
||||
} while (continuationToken);
|
||||
}
|
||||
|
||||
async function processBatchAsync(batch: any[], bucket: string, domainId: number, s3: S3Client) {
|
||||
// Async Batch Processing ohne den Hauptthread zu blockieren
|
||||
const emailData = await Promise.all(
|
||||
batch.map(async (obj) => {
|
||||
try {
|
||||
const [getObjResponse, headResponse] = await Promise.all([
|
||||
s3.send(new GetObjectCommand({ Bucket: bucket, Key: obj.Key })),
|
||||
s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key }))
|
||||
]);
|
||||
|
||||
const raw = await getBody(getObjResponse.Body as Readable);
|
||||
const parsed = await simpleParser(raw);
|
||||
|
||||
return {
|
||||
domainId,
|
||||
s3Key: obj.Key,
|
||||
from: parsed.from?.value[0]?.address,
|
||||
to: extractAddresses(parsed.to),
|
||||
cc: extractAddresses(parsed.cc),
|
||||
bcc: extractAddresses(parsed.bcc),
|
||||
subject: parsed.subject,
|
||||
html: parsed.html || parsed.textAsHtml,
|
||||
raw: raw.toString('utf-8'),
|
||||
processed: headResponse.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
|
||||
date: parsed.date || obj.LastModified,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(`Error processing ${obj.Key}:`, error);
|
||||
return null;
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
const validData = emailData.filter(Boolean);
|
||||
if (validData.length > 0) {
|
||||
await db.insert(emails).values(validData);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user