Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 86d7426e8b |
@@ -1,5 +1,6 @@
|
||||
import { Socket } from "socket.io-client";
|
||||
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
||||
import { prisma } from "../../constants";
|
||||
|
||||
/**
|
||||
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
||||
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
|
||||
* Called every 5 seconds via PgBoss from the API process interval.
|
||||
*/
|
||||
export async function executeIncrementalSync(): Promise<void> {
|
||||
return executeForcedIncrementalDalpuriSync();
|
||||
let jobRunId: string | undefined;
|
||||
|
||||
try {
|
||||
const run = await prisma.syncJobRun.create({
|
||||
data: {
|
||||
jobType: "INCREMENTAL_SYNC",
|
||||
status: "RUNNING",
|
||||
triggeredBy: "worker",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
jobRunId = run.id;
|
||||
} catch (err) {
|
||||
// Sync should still run even if tracking insert fails.
|
||||
console.error("[sync] Failed to create incremental SyncJobRun", err);
|
||||
}
|
||||
|
||||
try {
|
||||
await executeForcedIncrementalDalpuriSync({ jobRunId });
|
||||
|
||||
if (jobRunId) {
|
||||
await prisma.syncJobRun.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "COMPLETED",
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
if (jobRunId) {
|
||||
const errorSummary = err instanceof Error ? err.message : String(err);
|
||||
await prisma.syncJobRun
|
||||
.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "FAILED",
|
||||
completedAt: new Date(),
|
||||
errorSummary: errorSummary.slice(0, 2000),
|
||||
},
|
||||
})
|
||||
.catch(() => {
|
||||
// Best-effort update only.
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
+59
-6
@@ -75,6 +75,43 @@ type DeleteResult = {
|
||||
|
||||
let incrementalDeleteStepIndex = 0;
|
||||
|
||||
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
]);
|
||||
|
||||
const criticalFullSyncIntervalMinutes = Math.max(
|
||||
1,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
|
||||
10
|
||||
) || 60
|
||||
);
|
||||
|
||||
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||
|
||||
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||
|
||||
const shouldForceCriticalFullSync = (
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): boolean => {
|
||||
if (!forceIncremental) return false;
|
||||
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
|
||||
|
||||
const now = Date.now();
|
||||
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
|
||||
|
||||
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
lastCriticalFullSyncByStep.set(step.name, now);
|
||||
return true;
|
||||
};
|
||||
|
||||
const parseEnvFile = (path: string): Record<string, string> => {
|
||||
const envData = readFileSync(path, "utf8");
|
||||
const out: Record<string, string> = {};
|
||||
@@ -1771,19 +1808,35 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const effectiveDecision = forceCriticalFullSync
|
||||
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||
: decision;
|
||||
|
||||
if (forceCriticalFullSync) {
|
||||
console.log(
|
||||
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
|
||||
);
|
||||
}
|
||||
|
||||
const sourceIdsFilter =
|
||||
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
||||
effectiveDecision.mode === "incremental"
|
||||
? effectiveDecision.sourceIds
|
||||
: undefined;
|
||||
console.log(
|
||||
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
||||
decision.mode
|
||||
effectiveDecision.mode
|
||||
}${
|
||||
decision.mode === "incremental"
|
||||
? ` (${decision.sourceIds.length} ids)`
|
||||
effectiveDecision.mode === "incremental"
|
||||
? ` (${effectiveDecision.sourceIds.length} ids)`
|
||||
: ""
|
||||
}`
|
||||
);
|
||||
if (logAllDifferences) {
|
||||
logAllSmartSyncDifferences(step, decision.differences);
|
||||
logAllSmartSyncDifferences(step, effectiveDecision.differences);
|
||||
}
|
||||
const result = await syncStep(
|
||||
cwPrisma,
|
||||
@@ -1805,7 +1858,7 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
|
||||
await writeStepLog(
|
||||
step.name,
|
||||
decision.mode,
|
||||
effectiveDecision.mode,
|
||||
result,
|
||||
{ deleted: 0, failed: 0 },
|
||||
Date.now() - stepStart
|
||||
|
||||
Reference in New Issue
Block a user