first version
This commit is contained in:
94
app/lib/sync.ts
Normal file
94
app/lib/sync.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import { db } from '@/app/db/drizzle';
|
||||
import { domains, emails } from '@/app/db/schema';
|
||||
import { getS3Client, getBody } from '@/app/lib/utils';
|
||||
import { simpleParser } from 'mailparser';
|
||||
import { ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
|
||||
import { eq, sql } from 'drizzle-orm';
|
||||
import { Readable } from 'stream';
|
||||
import pRetry from 'p-retry'; // Für Retry bei Timeouts
|
||||
|
||||
export async function syncAllDomains() {
|
||||
console.log('Starting syncAllDomains...');
|
||||
const s3 = getS3Client();
|
||||
const { Buckets } = await pRetry(() => s3.send(new ListBucketsCommand({})), { retries: 3 }); // Retry bei Fail
|
||||
const domainBuckets = Buckets?.filter(b => b.Name?.endsWith('-emails')) || [];
|
||||
console.log('Found domain buckets:', domainBuckets.map(b => b.Name).join(', ') || 'None');
|
||||
|
||||
for (const bucketObj of domainBuckets) {
|
||||
const bucket = bucketObj.Name!;
|
||||
const domainName = bucket.replace('-emails', '').replace(/-/g, '.');
|
||||
console.log(`Processing bucket: ${bucket} (domain: ${domainName})`);
|
||||
|
||||
// Upsert Domain
|
||||
const existingDomain = await db.select().from(domains).where(eq(domains.bucket, bucket)).limit(1);
|
||||
console.log(`Existing domain for ${bucket}: ${existingDomain.length > 0 ? 'Yes (ID: ' + existingDomain[0].id + ')' : 'No'}`);
|
||||
let domainId;
|
||||
if (existingDomain.length === 0) {
|
||||
const [newDomain] = await db.insert(domains).values({ bucket, domain: domainName }).returning({ id: domains.id });
|
||||
domainId = newDomain.id;
|
||||
console.log(`Created new domain ID: ${domainId}`);
|
||||
} else {
|
||||
domainId = existingDomain[0].id;
|
||||
}
|
||||
|
||||
// Sync Emails
|
||||
await syncEmailsForDomain(domainId, bucket);
|
||||
}
|
||||
console.log('syncAllDomains completed.');
|
||||
}
|
||||
|
||||
async function syncEmailsForDomain(domainId: number, bucket: string) {
|
||||
console.log(`Starting syncEmailsForDomain for bucket: ${bucket} (domainId: ${domainId})`);
|
||||
const s3 = getS3Client();
|
||||
const { Contents } = await pRetry(() => s3.send(new ListObjectsV2Command({ Bucket: bucket })), { retries: 3 });
|
||||
console.log(`Found objects in bucket: ${Contents?.length || 0}`);
|
||||
|
||||
for (const obj of Contents || []) {
|
||||
if (!obj.Key) continue;
|
||||
console.log(`Processing object: ${obj.Key}`);
|
||||
|
||||
// Check if exists
|
||||
const existing = await db.select().from(emails).where(eq(emails.s3Key, obj.Key!)).limit(1);
|
||||
console.log(`Existing email for ${obj.Key}: ${existing.length > 0 ? 'Yes' : 'No'}`);
|
||||
if (existing.length > 0) {
|
||||
// Update processed if changed
|
||||
const head = await pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
|
||||
const processed = head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!;
|
||||
console.log(`Processed metadata for ${obj.Key}: ${processed} (DB: ${existing[0].processed})`);
|
||||
if (existing[0].processed !== processed) {
|
||||
await db.update(emails).set({ processed }).where(eq(emails.s3Key, obj.Key!));
|
||||
console.log(`Updated processed for ${obj.Key}`);
|
||||
}
|
||||
await new Promise(resolve => setTimeout(resolve, 100)); // Kleine Delay gegen Throttling
|
||||
continue;
|
||||
}
|
||||
|
||||
// New: Parse and insert
|
||||
console.log(`Parsing new email: ${obj.Key}`);
|
||||
const { Body } = await pRetry(() => s3.send(new GetObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
|
||||
const raw = await getBody(Body as Readable);
|
||||
const parsed = await simpleParser(raw);
|
||||
const head = await pRetry(() => s3.send(new HeadObjectCommand({ Bucket: bucket, Key: obj.Key })), { retries: 3 });
|
||||
|
||||
const to = parsed.to ? (Array.isArray(parsed.to) ? parsed.to.flatMap(t => t.value.map(v => v.address?.toLowerCase() || '')) : parsed.to.value.map(v => v.address?.toLowerCase() || '')) : [];
|
||||
const cc = parsed.cc ? (Array.isArray(parsed.cc) ? parsed.cc.flatMap(c => c.value.map(v => v.address?.toLowerCase() || '')) : parsed.cc.value.map(v => v.address?.toLowerCase() || '')) : [];
|
||||
const bcc = parsed.bcc ? (Array.isArray(parsed.bcc) ? parsed.bcc.flatMap(b => b.value.map(v => v.address?.toLowerCase() || '')) : parsed.bcc.value.map(v => v.address?.toLowerCase() || '')) : [];
|
||||
|
||||
await db.insert(emails).values({
|
||||
domainId,
|
||||
s3Key: obj.Key!,
|
||||
from: parsed.from?.value[0].address,
|
||||
to,
|
||||
cc,
|
||||
bcc,
|
||||
subject: parsed.subject,
|
||||
html: parsed.html || parsed.textAsHtml,
|
||||
raw: raw.toString('utf-8'),
|
||||
processed: head.Metadata?.[process.env.PROCESSED_META_KEY!] === process.env.PROCESSED_META_VALUE!,
|
||||
date: parsed.date || obj.LastModified,
|
||||
});
|
||||
console.log(`Inserted new email: ${obj.Key}`);
|
||||
await new Promise(resolve => setTimeout(resolve, 100)); // Delay gegen Throttling
|
||||
}
|
||||
console.log(`syncEmailsForDomain completed for bucket: ${bucket}`);
|
||||
}
|
||||
Reference in New Issue
Block a user