From 86d7426e8bf65f220581f54f998b48b75cc1e355 Mon Sep 17 00:00:00 2001 From: Jackson Roberts Date: Fri, 10 Apr 2026 04:36:36 +0000 Subject: [PATCH] fix(sync): harden incremental observability and periodic reconciliation --- api/src/modules/workers/dalpuri-sync.ts | 51 ++++++++++++++++++- dalpuri/src/sync.ts | 65 ++++++++++++++++++++++--- 2 files changed, 109 insertions(+), 7 deletions(-) diff --git a/api/src/modules/workers/dalpuri-sync.ts b/api/src/modules/workers/dalpuri-sync.ts index f83b6e2..4f70807 100644 --- a/api/src/modules/workers/dalpuri-sync.ts +++ b/api/src/modules/workers/dalpuri-sync.ts @@ -1,5 +1,6 @@ import { Socket } from "socket.io-client"; import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri"; +import { prisma } from "../../constants"; /** * Execute a full sync from Dalpuri (ConnectWise) to the API database. @@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise { * Called every 5 seconds via PgBoss from the API process interval. */ export async function executeIncrementalSync(): Promise { - return executeForcedIncrementalDalpuriSync(); + let jobRunId: string | undefined; + + try { + const run = await prisma.syncJobRun.create({ + data: { + jobType: "INCREMENTAL_SYNC", + status: "RUNNING", + triggeredBy: "worker", + startedAt: new Date(), + }, + select: { id: true }, + }); + jobRunId = run.id; + } catch (err) { + // Sync should still run even if tracking insert fails. + console.error("[sync] Failed to create incremental SyncJobRun", err); + } + + try { + await executeForcedIncrementalDalpuriSync({ jobRunId }); + + if (jobRunId) { + await prisma.syncJobRun.update({ + where: { id: jobRunId }, + data: { + status: "COMPLETED", + completedAt: new Date(), + }, + }); + } + } catch (err) { + if (jobRunId) { + const errorSummary = err instanceof Error ? err.message : String(err); + await prisma.syncJobRun + .update({ + where: { id: jobRunId }, + data: { + status: "FAILED", + completedAt: new Date(), + errorSummary: errorSummary.slice(0, 2000), + }, + }) + .catch(() => { + // Best-effort update only. + }); + } + + throw err; + } } diff --git a/dalpuri/src/sync.ts b/dalpuri/src/sync.ts index 1695dce..a983f2c 100644 --- a/dalpuri/src/sync.ts +++ b/dalpuri/src/sync.ts @@ -75,6 +75,43 @@ type DeleteResult = { let incrementalDeleteStepIndex = 0; +const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([ + "Companies", + "Company Addresses", + "Contacts", +]); + +const criticalFullSyncIntervalMinutes = Math.max( + 1, + Number.parseInt( + process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60", + 10 + ) || 60 +); + +const CRITICAL_FULL_SYNC_INTERVAL_MS = + criticalFullSyncIntervalMinutes * 60 * 1000; + +const lastCriticalFullSyncByStep = new Map(); + +const shouldForceCriticalFullSync = ( + step: Step, + forceIncremental: boolean +): boolean => { + if (!forceIncremental) return false; + if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false; + + const now = Date.now(); + const last = lastCriticalFullSyncByStep.get(step.name) ?? 0; + + if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) { + return false; + } + + lastCriticalFullSyncByStep.set(step.name, now); + return true; +}; + const parseEnvFile = (path: string): Record => { const envData = readFileSync(path, "utf8"); const out: Record = {}; @@ -1771,19 +1808,35 @@ export const executeFullDalpuriSync = async (options?: { step, forceIncremental ); + const forceCriticalFullSync = shouldForceCriticalFullSync( + step, + forceIncremental + ); + const effectiveDecision = forceCriticalFullSync + ? ({ mode: "full", differences: decision.differences } as SmartSyncDecision) + : decision; + + if (forceCriticalFullSync) { + console.log( + ` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m` + ); + } + const sourceIdsFilter = - decision.mode === "incremental" ? decision.sourceIds : undefined; + effectiveDecision.mode === "incremental" + ? effectiveDecision.sourceIds + : undefined; console.log( ` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${ - decision.mode + effectiveDecision.mode }${ - decision.mode === "incremental" - ? ` (${decision.sourceIds.length} ids)` + effectiveDecision.mode === "incremental" + ? ` (${effectiveDecision.sourceIds.length} ids)` : "" }` ); if (logAllDifferences) { - logAllSmartSyncDifferences(step, decision.differences); + logAllSmartSyncDifferences(step, effectiveDecision.differences); } const result = await syncStep( cwPrisma, @@ -1805,7 +1858,7 @@ export const executeFullDalpuriSync = async (options?: { await writeStepLog( step.name, - decision.mode, + effectiveDecision.mode, result, { deleted: 0, failed: 0 }, Date.now() - stepStart