fix(sync): harden incremental observability and periodic reconciliation

This commit is contained in:
2026-04-10 04:36:36 +00:00
parent afe56393e7
commit 86d7426e8b
2 changed files with 109 additions and 7 deletions
+50 -1
View File
@@ -1,5 +1,6 @@
import { Socket } from "socket.io-client"; import { Socket } from "socket.io-client";
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri"; import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
import { prisma } from "../../constants";
/** /**
* Execute a full sync from Dalpuri (ConnectWise) to the API database. * Execute a full sync from Dalpuri (ConnectWise) to the API database.
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
* Called every 5 seconds via PgBoss from the API process interval. * Called every 5 seconds via PgBoss from the API process interval.
*/ */
export async function executeIncrementalSync(): Promise<void> { export async function executeIncrementalSync(): Promise<void> {
return executeForcedIncrementalDalpuriSync(); let jobRunId: string | undefined;
try {
const run = await prisma.syncJobRun.create({
data: {
jobType: "INCREMENTAL_SYNC",
status: "RUNNING",
triggeredBy: "worker",
startedAt: new Date(),
},
select: { id: true },
});
jobRunId = run.id;
} catch (err) {
// Sync should still run even if tracking insert fails.
console.error("[sync] Failed to create incremental SyncJobRun", err);
}
try {
await executeForcedIncrementalDalpuriSync({ jobRunId });
if (jobRunId) {
await prisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
}
} catch (err) {
if (jobRunId) {
const errorSummary = err instanceof Error ? err.message : String(err);
await prisma.syncJobRun
.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorSummary.slice(0, 2000),
},
})
.catch(() => {
// Best-effort update only.
});
}
throw err;
}
} }
+59 -6
View File
@@ -75,6 +75,43 @@ type DeleteResult = {
let incrementalDeleteStepIndex = 0; let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
10
) || 60
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const lastCriticalFullSyncByStep = new Map<string, number>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const parseEnvFile = (path: string): Record<string, string> => { const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8"); const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {}; const out: Record<string, string> = {};
@@ -1771,19 +1808,35 @@ export const executeFullDalpuriSync = async (options?: {
step, step,
forceIncremental forceIncremental
); );
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter = const sourceIdsFilter =
decision.mode === "incremental" ? decision.sourceIds : undefined; effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log( console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${ ` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
decision.mode effectiveDecision.mode
}${ }${
decision.mode === "incremental" effectiveDecision.mode === "incremental"
? ` (${decision.sourceIds.length} ids)` ? ` (${effectiveDecision.sourceIds.length} ids)`
: "" : ""
}` }`
); );
if (logAllDifferences) { if (logAllDifferences) {
logAllSmartSyncDifferences(step, decision.differences); logAllSmartSyncDifferences(step, effectiveDecision.differences);
} }
const result = await syncStep( const result = await syncStep(
cwPrisma, cwPrisma,
@@ -1805,7 +1858,7 @@ export const executeFullDalpuriSync = async (options?: {
await writeStepLog( await writeStepLog(
step.name, step.name,
decision.mode, effectiveDecision.mode,
result, result,
{ deleted: 0, failed: 0 }, { deleted: 0, failed: 0 },
Date.now() - stepStart Date.now() - stepStart