From 2bd498a35d940fd7fb0d6e12be390f1069de21f4 Mon Sep 17 00:00:00 2001 From: Jackson Roberts Date: Fri, 10 Apr 2026 04:53:57 +0000 Subject: [PATCH] fix(sync): use CW watermark incremental path for critical tables --- dalpuri/src/sync.ts | 104 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 3 deletions(-) diff --git a/dalpuri/src/sync.ts b/dalpuri/src/sync.ts index a983f2c..c89ee2c 100644 --- a/dalpuri/src/sync.ts +++ b/dalpuri/src/sync.ts @@ -79,20 +79,46 @@ const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([ "Companies", "Company Addresses", "Contacts", + "Opportunities", +]); + +const CRITICAL_CW_WATERMARK_TABLES = new Set([ + "Companies", + "Company Addresses", + "Contacts", + "Opportunities", ]); const criticalFullSyncIntervalMinutes = Math.max( 1, Number.parseInt( - process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60", + process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15", 10 - ) || 60 + ) || 15 ); const CRITICAL_FULL_SYNC_INTERVAL_MS = criticalFullSyncIntervalMinutes * 60 * 1000; +const criticalCwWatermarkOverlapSeconds = Math.max( + 5, + Number.parseInt( + process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60", + 10 + ) || 60 +); + +const CRITICAL_CW_WATERMARK_OVERLAP_MS = + criticalCwWatermarkOverlapSeconds * 1000; + +const criticalCwDeltaLimit = Math.max( + 100, + Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) || + 5000 +); + const lastCriticalFullSyncByStep = new Map(); +const lastCriticalCwWatermarkByStep = new Map(); const shouldForceCriticalFullSync = ( step: Step, @@ -112,6 +138,73 @@ const shouldForceCriticalFullSync = ( return true; }; +const computeCriticalCwWatermarkDecision = async ( + cwPrisma: CwPrismaClient, + step: Step, + forceIncremental: boolean +): Promise => { + if (!forceIncremental) return null; + if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null; + + const cwDelegate = ( + cwPrisma as unknown as Record + )[step.sourceModel]; + + if (!cwDelegate) { + return null; + } + + const existingWhere = + (step.sourceArgs as Record | undefined)?.where ?? {}; + + const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name); + const lowerBound = lastWatermark + ? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS) + : new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS); + + const rows = (await cwDelegate.findMany({ + select: { + [step.sourceIdField]: true, + [step.sourceUpdatedField]: true, + }, + where: { + ...(existingWhere as Record), + [step.sourceUpdatedField]: { + gte: lowerBound, + }, + }, + orderBy: { [step.sourceUpdatedField]: "asc" }, + take: criticalCwDeltaLimit, + })) as Row[]; + + if (rows.length >= criticalCwDeltaLimit) { + console.warn( + ` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync` + ); + return { mode: "full", differences: [] }; + } + + if (rows.length > 0) { + const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null; + if (latest) { + lastCriticalCwWatermarkByStep.set(step.name, latest); + } + } else if (!lastWatermark) { + lastCriticalCwWatermarkByStep.set(step.name, new Date()); + } + + const sourceIds = rows.map((r) => r[step.sourceIdField] as number); + console.log( + ` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}` + ); + + return { + mode: "incremental", + sourceIds, + differences: [], + }; +}; + const parseEnvFile = (path: string): Record => { const envData = readFileSync(path, "utf8"); const out: Record = {}; @@ -1808,13 +1901,18 @@ export const executeFullDalpuriSync = async (options?: { step, forceIncremental ); + const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision( + cwPrisma, + step, + forceIncremental + ); const forceCriticalFullSync = shouldForceCriticalFullSync( step, forceIncremental ); const effectiveDecision = forceCriticalFullSync ? ({ mode: "full", differences: decision.differences } as SmartSyncDecision) - : decision; + : criticalWatermarkDecision ?? decision; if (forceCriticalFullSync) { console.log(