Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 57b5763d41 | |||
| 2bd498a35d |
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function formatOpportunityContactName(
|
||||||
|
firstName?: string | null,
|
||||||
|
lastName?: string | null
|
||||||
|
): string {
|
||||||
|
const first = (firstName ?? "").trim();
|
||||||
|
const last = (lastName ?? "").trim();
|
||||||
|
|
||||||
|
if (first && last.toLowerCase() === "contact") {
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
|
||||||
|
return `${first} ${last}`.trim();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opportunity Controller
|
* Opportunity Controller
|
||||||
*
|
*
|
||||||
@@ -290,7 +304,7 @@ export class OpportunityController {
|
|||||||
| null
|
| null
|
||||||
| undefined;
|
| undefined;
|
||||||
this.contactName = (data as any).contactName ?? (contactRel
|
this.contactName = (data as any).contactName ?? (contactRel
|
||||||
? `${contactRel.firstName} ${contactRel.lastName}`.trim()
|
? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
|
||||||
: null);
|
: null);
|
||||||
|
|
||||||
// Site
|
// Site
|
||||||
@@ -674,7 +688,7 @@ export class OpportunityController {
|
|||||||
id: contact.id,
|
id: contact.id,
|
||||||
contact: {
|
contact: {
|
||||||
id: contact.id,
|
id: contact.id,
|
||||||
name: `${contact.firstName} ${contact.lastName}`.trim(),
|
name: formatOpportunityContactName(contact.firstName, contact.lastName),
|
||||||
},
|
},
|
||||||
company: contact.company
|
company: contact.company
|
||||||
? {
|
? {
|
||||||
|
|||||||
+101
-3
@@ -79,20 +79,46 @@ const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
|||||||
"Companies",
|
"Companies",
|
||||||
"Company Addresses",
|
"Company Addresses",
|
||||||
"Contacts",
|
"Contacts",
|
||||||
|
"Opportunities",
|
||||||
|
]);
|
||||||
|
|
||||||
|
const CRITICAL_CW_WATERMARK_TABLES = new Set([
|
||||||
|
"Companies",
|
||||||
|
"Company Addresses",
|
||||||
|
"Contacts",
|
||||||
|
"Opportunities",
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const criticalFullSyncIntervalMinutes = Math.max(
|
const criticalFullSyncIntervalMinutes = Math.max(
|
||||||
1,
|
1,
|
||||||
Number.parseInt(
|
Number.parseInt(
|
||||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
|
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
|
||||||
10
|
10
|
||||||
) || 60
|
) || 15
|
||||||
);
|
);
|
||||||
|
|
||||||
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||||
criticalFullSyncIntervalMinutes * 60 * 1000;
|
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||||
|
|
||||||
|
const criticalCwWatermarkOverlapSeconds = Math.max(
|
||||||
|
5,
|
||||||
|
Number.parseInt(
|
||||||
|
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
|
||||||
|
10
|
||||||
|
) || 60
|
||||||
|
);
|
||||||
|
|
||||||
|
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
|
||||||
|
criticalCwWatermarkOverlapSeconds * 1000;
|
||||||
|
|
||||||
|
const criticalCwDeltaLimit = Math.max(
|
||||||
|
100,
|
||||||
|
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
|
||||||
|
5000
|
||||||
|
);
|
||||||
|
|
||||||
const lastCriticalFullSyncByStep = new Map<string, number>();
|
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||||
|
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
|
||||||
|
|
||||||
const shouldForceCriticalFullSync = (
|
const shouldForceCriticalFullSync = (
|
||||||
step: Step,
|
step: Step,
|
||||||
@@ -112,6 +138,73 @@ const shouldForceCriticalFullSync = (
|
|||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const computeCriticalCwWatermarkDecision = async (
|
||||||
|
cwPrisma: CwPrismaClient,
|
||||||
|
step: Step,
|
||||||
|
forceIncremental: boolean
|
||||||
|
): Promise<SmartSyncDecision | null> => {
|
||||||
|
if (!forceIncremental) return null;
|
||||||
|
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
|
||||||
|
|
||||||
|
const cwDelegate = (
|
||||||
|
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
||||||
|
)[step.sourceModel];
|
||||||
|
|
||||||
|
if (!cwDelegate) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const existingWhere =
|
||||||
|
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
||||||
|
|
||||||
|
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
|
||||||
|
const lowerBound = lastWatermark
|
||||||
|
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
|
||||||
|
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
|
||||||
|
|
||||||
|
const rows = (await cwDelegate.findMany({
|
||||||
|
select: {
|
||||||
|
[step.sourceIdField]: true,
|
||||||
|
[step.sourceUpdatedField]: true,
|
||||||
|
},
|
||||||
|
where: {
|
||||||
|
...(existingWhere as Record<string, unknown>),
|
||||||
|
[step.sourceUpdatedField]: {
|
||||||
|
gte: lowerBound,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
orderBy: { [step.sourceUpdatedField]: "asc" },
|
||||||
|
take: criticalCwDeltaLimit,
|
||||||
|
})) as Row[];
|
||||||
|
|
||||||
|
if (rows.length >= criticalCwDeltaLimit) {
|
||||||
|
console.warn(
|
||||||
|
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
|
||||||
|
);
|
||||||
|
return { mode: "full", differences: [] };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows.length > 0) {
|
||||||
|
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
|
||||||
|
if (latest) {
|
||||||
|
lastCriticalCwWatermarkByStep.set(step.name, latest);
|
||||||
|
}
|
||||||
|
} else if (!lastWatermark) {
|
||||||
|
lastCriticalCwWatermarkByStep.set(step.name, new Date());
|
||||||
|
}
|
||||||
|
|
||||||
|
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
|
||||||
|
console.log(
|
||||||
|
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
mode: "incremental",
|
||||||
|
sourceIds,
|
||||||
|
differences: [],
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
const parseEnvFile = (path: string): Record<string, string> => {
|
const parseEnvFile = (path: string): Record<string, string> => {
|
||||||
const envData = readFileSync(path, "utf8");
|
const envData = readFileSync(path, "utf8");
|
||||||
const out: Record<string, string> = {};
|
const out: Record<string, string> = {};
|
||||||
@@ -1808,13 +1901,18 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
step,
|
step,
|
||||||
forceIncremental
|
forceIncremental
|
||||||
);
|
);
|
||||||
|
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
|
||||||
|
cwPrisma,
|
||||||
|
step,
|
||||||
|
forceIncremental
|
||||||
|
);
|
||||||
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||||
step,
|
step,
|
||||||
forceIncremental
|
forceIncremental
|
||||||
);
|
);
|
||||||
const effectiveDecision = forceCriticalFullSync
|
const effectiveDecision = forceCriticalFullSync
|
||||||
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||||
: decision;
|
: criticalWatermarkDecision ?? decision;
|
||||||
|
|
||||||
if (forceCriticalFullSync) {
|
if (forceCriticalFullSync) {
|
||||||
console.log(
|
console.log(
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
|
|||||||
{
|
{
|
||||||
from: "lastName",
|
from: "lastName",
|
||||||
to: "lastName",
|
to: "lastName",
|
||||||
process: (value) => (value ? value : "Contact"),
|
process: (value) => (value ? value : ""),
|
||||||
},
|
},
|
||||||
{ from: "nickName", to: "nickname" },
|
{ from: "nickName", to: "nickname" },
|
||||||
{ from: "title", to: "title" },
|
{ from: "title", to: "title" },
|
||||||
|
|||||||
Reference in New Issue
Block a user