Compare commits

..

2 Commits

3 changed files with 118 additions and 6 deletions
+16 -2
View File
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
return null; return null;
} }
function formatOpportunityContactName(
firstName?: string | null,
lastName?: string | null
): string {
const first = (firstName ?? "").trim();
const last = (lastName ?? "").trim();
if (first && last.toLowerCase() === "contact") {
return first;
}
return `${first} ${last}`.trim();
}
/** /**
* Opportunity Controller * Opportunity Controller
* *
@@ -290,7 +304,7 @@ export class OpportunityController {
| null | null
| undefined; | undefined;
this.contactName = (data as any).contactName ?? (contactRel this.contactName = (data as any).contactName ?? (contactRel
? `${contactRel.firstName} ${contactRel.lastName}`.trim() ? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
: null); : null);
// Site // Site
@@ -674,7 +688,7 @@ export class OpportunityController {
id: contact.id, id: contact.id,
contact: { contact: {
id: contact.id, id: contact.id,
name: `${contact.firstName} ${contact.lastName}`.trim(), name: formatOpportunityContactName(contact.firstName, contact.lastName),
}, },
company: contact.company company: contact.company
? { ? {
+101 -3
View File
@@ -79,20 +79,46 @@ const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies", "Companies",
"Company Addresses", "Company Addresses",
"Contacts", "Contacts",
"Opportunities",
]);
const CRITICAL_CW_WATERMARK_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]); ]);
const criticalFullSyncIntervalMinutes = Math.max( const criticalFullSyncIntervalMinutes = Math.max(
1, 1,
Number.parseInt( Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60", process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
10 10
) || 60 ) || 15
); );
const CRITICAL_FULL_SYNC_INTERVAL_MS = const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000; criticalFullSyncIntervalMinutes * 60 * 1000;
const criticalCwWatermarkOverlapSeconds = Math.max(
5,
Number.parseInt(
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
10
) || 60
);
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
criticalCwWatermarkOverlapSeconds * 1000;
const criticalCwDeltaLimit = Math.max(
100,
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
5000
);
const lastCriticalFullSyncByStep = new Map<string, number>(); const lastCriticalFullSyncByStep = new Map<string, number>();
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
const shouldForceCriticalFullSync = ( const shouldForceCriticalFullSync = (
step: Step, step: Step,
@@ -112,6 +138,73 @@ const shouldForceCriticalFullSync = (
return true; return true;
}; };
const computeCriticalCwWatermarkDecision = async (
cwPrisma: CwPrismaClient,
step: Step,
forceIncremental: boolean
): Promise<SmartSyncDecision | null> => {
if (!forceIncremental) return null;
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
if (!cwDelegate) {
return null;
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
const lowerBound = lastWatermark
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
const rows = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceUpdatedField]: {
gte: lowerBound,
},
},
orderBy: { [step.sourceUpdatedField]: "asc" },
take: criticalCwDeltaLimit,
})) as Row[];
if (rows.length >= criticalCwDeltaLimit) {
console.warn(
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
);
return { mode: "full", differences: [] };
}
if (rows.length > 0) {
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
if (latest) {
lastCriticalCwWatermarkByStep.set(step.name, latest);
}
} else if (!lastWatermark) {
lastCriticalCwWatermarkByStep.set(step.name, new Date());
}
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
console.log(
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
);
return {
mode: "incremental",
sourceIds,
differences: [],
};
};
const parseEnvFile = (path: string): Record<string, string> => { const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8"); const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {}; const out: Record<string, string> = {};
@@ -1808,13 +1901,18 @@ export const executeFullDalpuriSync = async (options?: {
step, step,
forceIncremental forceIncremental
); );
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
cwPrisma,
step,
forceIncremental
);
const forceCriticalFullSync = shouldForceCriticalFullSync( const forceCriticalFullSync = shouldForceCriticalFullSync(
step, step,
forceIncremental forceIncremental
); );
const effectiveDecision = forceCriticalFullSync const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision) ? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: decision; : criticalWatermarkDecision ?? decision;
if (forceCriticalFullSync) { if (forceCriticalFullSync) {
console.log( console.log(
+1 -1
View File
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
{ {
from: "lastName", from: "lastName",
to: "lastName", to: "lastName",
process: (value) => (value ? value : "Contact"), process: (value) => (value ? value : ""),
}, },
{ from: "nickName", to: "nickname" }, { from: "nickName", to: "nickname" },
{ from: "title", to: "title" }, { from: "title", to: "title" },