Compare commits

...

3 Commits

4 changed files with 224 additions and 10 deletions
+16 -2
View File
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
return null; return null;
} }
function formatOpportunityContactName(
firstName?: string | null,
lastName?: string | null
): string {
const first = (firstName ?? "").trim();
const last = (lastName ?? "").trim();
if (first && last.toLowerCase() === "contact") {
return first;
}
return `${first} ${last}`.trim();
}
/** /**
* Opportunity Controller * Opportunity Controller
* *
@@ -290,7 +304,7 @@ export class OpportunityController {
| null | null
| undefined; | undefined;
this.contactName = (data as any).contactName ?? (contactRel this.contactName = (data as any).contactName ?? (contactRel
? `${contactRel.firstName} ${contactRel.lastName}`.trim() ? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
: null); : null);
// Site // Site
@@ -674,7 +688,7 @@ export class OpportunityController {
id: contact.id, id: contact.id,
contact: { contact: {
id: contact.id, id: contact.id,
name: `${contact.firstName} ${contact.lastName}`.trim(), name: formatOpportunityContactName(contact.firstName, contact.lastName),
}, },
company: contact.company company: contact.company
? { ? {
+50 -1
View File
@@ -1,5 +1,6 @@
import { Socket } from "socket.io-client"; import { Socket } from "socket.io-client";
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri"; import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
import { prisma } from "../../constants";
/** /**
* Execute a full sync from Dalpuri (ConnectWise) to the API database. * Execute a full sync from Dalpuri (ConnectWise) to the API database.
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
* Called every 5 seconds via PgBoss from the API process interval. * Called every 5 seconds via PgBoss from the API process interval.
*/ */
export async function executeIncrementalSync(): Promise<void> { export async function executeIncrementalSync(): Promise<void> {
return executeForcedIncrementalDalpuriSync(); let jobRunId: string | undefined;
try {
const run = await prisma.syncJobRun.create({
data: {
jobType: "INCREMENTAL_SYNC",
status: "RUNNING",
triggeredBy: "worker",
startedAt: new Date(),
},
select: { id: true },
});
jobRunId = run.id;
} catch (err) {
// Sync should still run even if tracking insert fails.
console.error("[sync] Failed to create incremental SyncJobRun", err);
}
try {
await executeForcedIncrementalDalpuriSync({ jobRunId });
if (jobRunId) {
await prisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
}
} catch (err) {
if (jobRunId) {
const errorSummary = err instanceof Error ? err.message : String(err);
await prisma.syncJobRun
.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorSummary.slice(0, 2000),
},
})
.catch(() => {
// Best-effort update only.
});
}
throw err;
}
} }
+157 -6
View File
@@ -75,6 +75,136 @@ type DeleteResult = {
let incrementalDeleteStepIndex = 0; let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const CRITICAL_CW_WATERMARK_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
10
) || 15
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const criticalCwWatermarkOverlapSeconds = Math.max(
5,
Number.parseInt(
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
10
) || 60
);
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
criticalCwWatermarkOverlapSeconds * 1000;
const criticalCwDeltaLimit = Math.max(
100,
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
5000
);
const lastCriticalFullSyncByStep = new Map<string, number>();
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const computeCriticalCwWatermarkDecision = async (
cwPrisma: CwPrismaClient,
step: Step,
forceIncremental: boolean
): Promise<SmartSyncDecision | null> => {
if (!forceIncremental) return null;
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
if (!cwDelegate) {
return null;
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
const lowerBound = lastWatermark
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
const rows = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceUpdatedField]: {
gte: lowerBound,
},
},
orderBy: { [step.sourceUpdatedField]: "asc" },
take: criticalCwDeltaLimit,
})) as Row[];
if (rows.length >= criticalCwDeltaLimit) {
console.warn(
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
);
return { mode: "full", differences: [] };
}
if (rows.length > 0) {
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
if (latest) {
lastCriticalCwWatermarkByStep.set(step.name, latest);
}
} else if (!lastWatermark) {
lastCriticalCwWatermarkByStep.set(step.name, new Date());
}
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
console.log(
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
);
return {
mode: "incremental",
sourceIds,
differences: [],
};
};
const parseEnvFile = (path: string): Record<string, string> => { const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8"); const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {}; const out: Record<string, string> = {};
@@ -1771,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
step, step,
forceIncremental forceIncremental
); );
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
cwPrisma,
step,
forceIncremental
);
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: criticalWatermarkDecision ?? decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter = const sourceIdsFilter =
decision.mode === "incremental" ? decision.sourceIds : undefined; effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log( console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${ ` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
decision.mode effectiveDecision.mode
}${ }${
decision.mode === "incremental" effectiveDecision.mode === "incremental"
? ` (${decision.sourceIds.length} ids)` ? ` (${effectiveDecision.sourceIds.length} ids)`
: "" : ""
}` }`
); );
if (logAllDifferences) { if (logAllDifferences) {
logAllSmartSyncDifferences(step, decision.differences); logAllSmartSyncDifferences(step, effectiveDecision.differences);
} }
const result = await syncStep( const result = await syncStep(
cwPrisma, cwPrisma,
@@ -1805,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
await writeStepLog( await writeStepLog(
step.name, step.name,
decision.mode, effectiveDecision.mode,
result, result,
{ deleted: 0, failed: 0 }, { deleted: 0, failed: 0 },
Date.now() - stepStart Date.now() - stepStart
+1 -1
View File
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
{ {
from: "lastName", from: "lastName",
to: "lastName", to: "lastName",
process: (value) => (value ? value : "Contact"), process: (value) => (value ? value : ""),
}, },
{ from: "nickName", to: "nickname" }, { from: "nickName", to: "nickname" },
{ from: "title", to: "title" }, { from: "title", to: "title" },