fix(sync): use CW watermark incremental path for critical tables
This commit is contained in:
+101
-3
@@ -79,20 +79,46 @@ const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const criticalFullSyncIntervalMinutes = Math.max(
|
||||
1,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
|
||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
|
||||
10
|
||||
) || 60
|
||||
) || 15
|
||||
);
|
||||
|
||||
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||
|
||||
const criticalCwWatermarkOverlapSeconds = Math.max(
|
||||
5,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
|
||||
10
|
||||
) || 60
|
||||
);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
|
||||
criticalCwWatermarkOverlapSeconds * 1000;
|
||||
|
||||
const criticalCwDeltaLimit = Math.max(
|
||||
100,
|
||||
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
|
||||
5000
|
||||
);
|
||||
|
||||
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
|
||||
|
||||
const shouldForceCriticalFullSync = (
|
||||
step: Step,
|
||||
@@ -112,6 +138,73 @@ const shouldForceCriticalFullSync = (
|
||||
return true;
|
||||
};
|
||||
|
||||
const computeCriticalCwWatermarkDecision = async (
|
||||
cwPrisma: CwPrismaClient,
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): Promise<SmartSyncDecision | null> => {
|
||||
if (!forceIncremental) return null;
|
||||
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
|
||||
|
||||
const cwDelegate = (
|
||||
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
||||
)[step.sourceModel];
|
||||
|
||||
if (!cwDelegate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const existingWhere =
|
||||
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
||||
|
||||
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
|
||||
const lowerBound = lastWatermark
|
||||
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
|
||||
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
|
||||
|
||||
const rows = (await cwDelegate.findMany({
|
||||
select: {
|
||||
[step.sourceIdField]: true,
|
||||
[step.sourceUpdatedField]: true,
|
||||
},
|
||||
where: {
|
||||
...(existingWhere as Record<string, unknown>),
|
||||
[step.sourceUpdatedField]: {
|
||||
gte: lowerBound,
|
||||
},
|
||||
},
|
||||
orderBy: { [step.sourceUpdatedField]: "asc" },
|
||||
take: criticalCwDeltaLimit,
|
||||
})) as Row[];
|
||||
|
||||
if (rows.length >= criticalCwDeltaLimit) {
|
||||
console.warn(
|
||||
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
|
||||
);
|
||||
return { mode: "full", differences: [] };
|
||||
}
|
||||
|
||||
if (rows.length > 0) {
|
||||
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
|
||||
if (latest) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, latest);
|
||||
}
|
||||
} else if (!lastWatermark) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, new Date());
|
||||
}
|
||||
|
||||
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
|
||||
console.log(
|
||||
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
|
||||
);
|
||||
|
||||
return {
|
||||
mode: "incremental",
|
||||
sourceIds,
|
||||
differences: [],
|
||||
};
|
||||
};
|
||||
|
||||
const parseEnvFile = (path: string): Record<string, string> => {
|
||||
const envData = readFileSync(path, "utf8");
|
||||
const out: Record<string, string> = {};
|
||||
@@ -1808,13 +1901,18 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
|
||||
cwPrisma,
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const effectiveDecision = forceCriticalFullSync
|
||||
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||
: decision;
|
||||
: criticalWatermarkDecision ?? decision;
|
||||
|
||||
if (forceCriticalFullSync) {
|
||||
console.log(
|
||||
|
||||
Reference in New Issue
Block a user