Files
optima/dalpuri/src/sync.ts
T

2065 lines
57 KiB
TypeScript

import { PrismaMssql } from "@prisma/adapter-mssql";
import { PrismaPg } from "@prisma/adapter-pg";
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { PrismaClient as CwPrismaClient } from "../generated/prisma/client";
import { PrismaClient as ApiPrismaClient } from "../../api/generated/prisma/client";
import {
catalogCategoryTranslation,
catalogItemTranslation,
catalogItemTypeTranslation,
catalogManufacturerTranslation,
catalogSubcategoryTranslation,
companyAddressTranslation,
companyTranslation,
contactTranslation,
corporateLocationTranslation,
createTranslationContext,
cwMemberTranslation,
internalDepartmentTranslation,
opportunityStageTranslation,
opportunityStatusTranslation,
opportunityTranslation,
opportunityTypeTranslation,
productDataTranslation,
productInventoryTranslation,
serviceTicketBoardTranslation,
serviceTicketImpactTranslation,
serviceTicketLocationTranslation,
serviceTicketNoteTranslation,
serviceTicketPriorityTranslation,
serviceTicketSeverityTranslation,
serviceTicketSourceTranslation,
serviceTicketTranslation,
serviceTicketTypeTranslation,
scheduleStatusTranslation,
scheduleTypeTranslation,
scheduleSpanTranslation,
scheduleTranslation,
taxCodeTranslation,
userTranslation,
warehouseBinTranslation,
type TranslationContext,
} from "./index";
import { Translation, SkipRowError } from "./translations/types";
type Row = Record<string, unknown>;
type AnyTranslation = Translation<Row, Row, TranslationContext>;
type Step = {
name: string;
sourceModel: string;
targetModel: string;
translation: AnyTranslation;
uniqueField: string;
sourceIdField: string;
sourceUpdatedField: string;
sourceArgs?: Record<string, unknown>;
targetDeleteWhere?: Record<string, unknown>;
/** API-side field used for timestamp comparison in smart-sync. Defaults to "updatedAt". Set to null for tables without timestamps. */
targetUpdatedField?: string | null;
};
type StepResult = {
insertedOrUpdated: number;
skipped: number;
failed: number;
};
type DeleteResult = {
deleted: number;
failed: number;
};
let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const CRITICAL_CW_WATERMARK_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
10
) || 15
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const criticalCwWatermarkOverlapSeconds = Math.max(
5,
Number.parseInt(
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
10
) || 60
);
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
criticalCwWatermarkOverlapSeconds * 1000;
const criticalCwDeltaLimit = Math.max(
100,
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
5000
);
const lastCriticalFullSyncByStep = new Map<string, number>();
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const computeCriticalCwWatermarkDecision = async (
cwPrisma: CwPrismaClient,
step: Step,
forceIncremental: boolean
): Promise<SmartSyncDecision | null> => {
if (!forceIncremental) return null;
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
if (!cwDelegate) {
return null;
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
const lowerBound = lastWatermark
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
const rows = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceUpdatedField]: {
gte: lowerBound,
},
},
orderBy: { [step.sourceUpdatedField]: "asc" },
take: criticalCwDeltaLimit,
})) as Row[];
if (rows.length >= criticalCwDeltaLimit) {
console.warn(
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
);
return { mode: "full", differences: [] };
}
if (rows.length > 0) {
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
if (latest) {
lastCriticalCwWatermarkByStep.set(step.name, latest);
}
} else if (!lastWatermark) {
lastCriticalCwWatermarkByStep.set(step.name, new Date());
}
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
console.log(
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
);
return {
mode: "incremental",
sourceIds,
differences: [],
};
};
const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {};
for (const rawLine of envData.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line || line.startsWith("#")) continue;
const index = line.indexOf("=");
if (index <= 0) continue;
const key = line.slice(0, index).trim();
let value = line.slice(index + 1).trim();
if (
(value.startsWith('"') && value.endsWith('"')) ||
(value.startsWith("'") && value.endsWith("'"))
) {
value = value.slice(1, -1);
}
out[key] = value;
}
return out;
};
const resolveApiDatabaseUrl = (): string => {
if (process.env.API_DATABASE_URL) return process.env.API_DATABASE_URL;
if (process.env.OPTIMA_API_DATABASE_URL)
return process.env.OPTIMA_API_DATABASE_URL;
// Worker/runtime fallback:
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
return process.env.DATABASE_URL;
}
if (
process.env.DATABASE_URL &&
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
) {
return process.env.DATABASE_URL;
}
const candidates = [
resolve(import.meta.dir, "../../api/.env"),
resolve(process.cwd(), "../api/.env"),
];
for (const apiEnvPath of candidates) {
try {
const apiEnv = parseEnvFile(apiEnvPath);
if (apiEnv.DATABASE_URL) {
return apiEnv.DATABASE_URL;
}
} catch {
// Try next path candidate.
}
}
return "";
};
const parseJsonMapFromEnv = (envKey: string): Map<number, string> => {
const raw = process.env[envKey];
if (!raw) return new Map();
try {
const parsed = JSON.parse(raw) as Record<string, string>;
const entries = Object.entries(parsed)
.map(([key, value]) => [Number.parseInt(key, 10), value] as const)
.filter(
([key, value]) => Number.isInteger(key) && typeof value === "string"
);
return new Map(entries);
} catch (error) {
console.warn(`Could not parse ${envKey} as JSON map. Ignoring.`, error);
return new Map();
}
};
const normalizeData = (data: Row): Row => {
const normalized: Row = {};
for (const [key, value] of Object.entries(data)) {
if (value !== undefined) {
normalized[key] = value;
}
}
return normalized;
};
const translateRow = (
row: Row,
translation: AnyTranslation,
context: TranslationContext
): Row => {
const out: Row = {};
for (const entry of translation.values) {
const fromKey = entry.from as string;
const toKey = entry.to as string;
const input = row[fromKey];
if (input === undefined) {
continue;
}
const output = entry.process
? entry.process(input as never, context, row)
: input;
out[toKey] = output as unknown;
}
return normalizeData(out);
};
const refreshContextFromApi = async (
apiPrisma: ApiPrismaClient,
context: TranslationContext
): Promise<void> => {
context.userIds.clear();
context.serviceTicketIds.clear();
context.opportunityIds.clear();
context.catalogItemIds.clear();
context.corporateLocationIds.clear();
context.companyIds.clear();
context.companyAddressIds.clear();
context.contactIds.clear();
context.opportunityStageIds.clear();
context.usersByMemberRecId.clear();
context.userIdentifiersByMemberRecId.clear();
context.usersByIdentifier.clear();
context.serviceTicketBoardUidsById.clear();
context.opportunityStatusIds.clear();
context.scheduleStatusIds.clear();
context.scheduleTypeIds.clear();
context.scheduleSpanIds.clear();
context.taxCodeIds.clear();
const [
users,
boards,
serviceTickets,
opportunities,
catalogItems,
locations,
opportunityStatuses,
companies,
companyAddresses,
contacts,
opportunityStages,
scheduleStatuses,
scheduleTypes,
scheduleSpans,
taxCodes,
] = await Promise.all([
apiPrisma.user.findMany({
select: {
id: true,
cwMemberId: true,
cwIdentifier: true,
},
}),
apiPrisma.serviceTicketBoard.findMany({
select: {
id: true,
uid: true,
},
}),
apiPrisma.serviceTicket.findMany({
select: {
id: true,
},
}),
apiPrisma.opportunity.findMany({
select: {
id: true,
},
}),
apiPrisma.catalogItem.findMany({
select: {
id: true,
},
}),
apiPrisma.corporateLocation.findMany({
select: {
id: true,
},
}),
apiPrisma.opportunityStatus.findMany({
select: {
id: true,
},
}),
apiPrisma.company.findMany({
select: {
id: true,
},
}),
apiPrisma.companyAddress.findMany({
select: {
id: true,
},
}),
apiPrisma.contact.findMany({
select: {
id: true,
},
}),
apiPrisma.opportunityStage.findMany({
select: {
id: true,
},
}),
apiPrisma.scheduleStatus.findMany({
select: {
id: true,
},
}),
apiPrisma.scheduleType.findMany({
select: {
id: true,
},
}),
apiPrisma.scheduleSpan.findMany({
select: {
id: true,
},
}),
apiPrisma.taxCode.findMany({
select: {
id: true,
},
}),
]);
const defaultOpportunityType = await apiPrisma.opportunityType.findFirst({
orderBy: { id: "asc" },
select: { id: true },
});
for (const user of users) {
context.userIds.add(user.id);
if (user.cwMemberId != null) {
context.usersByMemberRecId.set(user.cwMemberId, user.id);
if (user.cwIdentifier) {
context.userIdentifiersByMemberRecId.set(
user.cwMemberId,
user.cwIdentifier
);
}
}
if (user.cwIdentifier) {
context.usersByIdentifier.set(user.cwIdentifier, user.id);
}
}
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid);
}
for (const serviceTicket of serviceTickets) {
context.serviceTicketIds.add(serviceTicket.id);
}
for (const opportunity of opportunities) {
context.opportunityIds.add(opportunity.id);
}
for (const catalogItem of catalogItems) {
context.catalogItemIds.add(catalogItem.id);
}
for (const location of locations) {
context.corporateLocationIds.add(location.id);
}
for (const status of opportunityStatuses) {
context.opportunityStatusIds.add(status.id);
}
for (const company of companies) {
context.companyIds.add(company.id);
}
for (const address of companyAddresses) {
context.companyAddressIds.add(address.id);
}
for (const contact of contacts) {
context.contactIds.add(contact.id);
}
for (const stage of opportunityStages) {
context.opportunityStageIds.add(stage.id);
}
for (const status of scheduleStatuses) {
context.scheduleStatusIds.add(status.id);
}
for (const type of scheduleTypes) {
context.scheduleTypeIds.add(type.id);
}
for (const span of scheduleSpans) {
context.scheduleSpanIds.add(span.id);
}
for (const taxCode of taxCodes) {
context.taxCodeIds.add(taxCode.id);
}
context.defaultOpportunityTypeId = defaultOpportunityType?.id ?? null;
// Optional context fed from env-provided maps.
context.billingTypeByTicketId = parseJsonMapFromEnv(
"BILLING_TYPE_CONTEXT_JSON"
);
context.billingInstructionsByTicketId = parseJsonMapFromEnv(
"BILLING_INSTRUCTIONS_CONTEXT_JSON"
);
};
const normalizeNarrativeFieldName = (value: unknown): string => {
if (typeof value !== "string") return "";
return value.trim().toLowerCase();
};
const isNarrativeCustomField = (fieldName: string): boolean => {
if (!fieldName) return false;
return fieldName.includes("narrative");
};
const normalizeNullableText = (value: unknown): string | null => {
if (typeof value !== "string") return null;
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : null;
};
const refreshCustomFieldContextFromCw = async (
cwPrisma: CwPrismaClient,
context: TranslationContext
): Promise<void> => {
context.opportunityNarrativeByOpportunityId.clear();
context.productCustomByIvProductId.clear();
const [productCustomRows, opportunityCustomRows] = await Promise.all([
cwPrisma.$queryRawUnsafe<Array<Record<string, unknown>>>(`
SELECT
IV_Product_RecID,
ProcurementNotes,
ProductNarrative
FROM dbo.v_IV_Product_Custom_Fields
`),
cwPrisma.$queryRawUnsafe<Array<Record<string, unknown>>>(`
SELECT
Opportunity_RecID,
opportunity_customfield,
opportunity_customvalue
FROM dbo.v_rpt_OpportunityCustomFields
WHERE opportunity_customvalue IS NOT NULL
`),
]);
for (const row of productCustomRows) {
const idRaw = row.IV_Product_RecID;
const ivProductRecId =
typeof idRaw === "number" ? idRaw : Number.parseInt(String(idRaw), 10);
if (!Number.isInteger(ivProductRecId)) continue;
context.productCustomByIvProductId.set(ivProductRecId, {
procurementNotes: normalizeNullableText(row.ProcurementNotes),
productNarrative: normalizeNullableText(row.ProductNarrative),
});
}
for (const row of opportunityCustomRows) {
const idRaw = row.Opportunity_RecID;
const opportunityRecId =
typeof idRaw === "number" ? idRaw : Number.parseInt(String(idRaw), 10);
if (!Number.isInteger(opportunityRecId)) continue;
const fieldName = normalizeNarrativeFieldName(row.opportunity_customfield);
if (!isNarrativeCustomField(fieldName)) continue;
const narrativeValue = normalizeNullableText(row.opportunity_customvalue);
if (!narrativeValue) continue;
// First narrative-looking custom field wins for each opportunity.
if (!context.opportunityNarrativeByOpportunityId.has(opportunityRecId)) {
context.opportunityNarrativeByOpportunityId.set(
opportunityRecId,
narrativeValue
);
}
}
};
const sanitizeUserForeignKeys = (
data: Row,
context: TranslationContext,
targetModel: string
): Row => {
if (
targetModel !== "serviceTicket" &&
targetModel !== "serviceTicketNote" &&
targetModel !== "company" &&
targetModel !== "companyAddress"
) {
return data;
}
const sanitized = { ...data };
const userIdFields =
targetModel === "serviceTicketNote"
? ["authorId"]
: targetModel === "serviceTicket"
? ["createdById", "updatedById", "closedById"]
: [];
const identifierFields =
targetModel === "serviceTicket"
? ["ticketOwnerId"]
: targetModel === "company"
? ["enteredById", "deletedById"]
: targetModel === "companyAddress"
? ["updatedById"]
: [];
for (const field of userIdFields) {
const value = sanitized[field];
if (typeof value === "string" && !context.userIds.has(value)) {
sanitized[field] = null;
}
}
for (const field of identifierFields) {
const value = sanitized[field];
if (typeof value === "string" && !context.usersByIdentifier.has(value)) {
sanitized[field] = null;
}
}
return sanitized;
};
const sanitizeModelData = (
data: Row,
step: Step,
context: TranslationContext
): Row => {
const sanitized = { ...data };
if (step.targetModel === "warehouseBin") {
if (sanitized.minQuantity == null) sanitized.minQuantity = 0;
if (sanitized.maxQuantity == null) sanitized.maxQuantity = 0;
}
if (step.targetModel === "contact") {
// Nullify companyAddressId if the address doesn't exist in the API
if (
sanitized.companyAddressId != null &&
!context.companyAddressIds.has(sanitized.companyAddressId as number)
) {
sanitized.companyAddressId = null;
}
// Nullify companyId if the company doesn't exist in the API
if (
sanitized.companyId != null &&
!context.companyIds.has(sanitized.companyId as number)
) {
sanitized.companyId = null;
}
}
if (step.targetModel === "companyAddress") {
// companyId is required — skip the row if the parent company hasn't synced yet
if (
sanitized.companyId == null ||
!context.companyIds.has(sanitized.companyId as number)
) {
throw new SkipRowError(
`CompanyAddress companyId=${sanitized.companyId} not found in API — skipping`
);
}
}
if (step.targetModel === "serviceTicket") {
const boardId = sanitized.serviceTicketBoardId;
if (typeof boardId === "string") {
const parsed = Number.parseInt(boardId, 10);
sanitized.serviceTicketBoardId = Number.isNaN(parsed) ? null : parsed;
}
// Nullify contactId if the contact doesn't exist in the API
if (
sanitized.contactId != null &&
!context.contactIds.has(sanitized.contactId as number)
) {
sanitized.contactId = null;
}
// Nullify companyAddressId if the address doesn't exist
if (
sanitized.companyAddressId != null &&
!context.companyAddressIds.has(sanitized.companyAddressId as number)
) {
sanitized.companyAddressId = null;
}
// Nullify companyId if the company doesn't exist
if (
sanitized.companyId != null &&
!context.companyIds.has(sanitized.companyId as number)
) {
sanitized.companyId = null;
}
// Nullify billingCompanyId if the company doesn't exist
if (
sanitized.billingCompanyId != null &&
!context.companyIds.has(sanitized.billingCompanyId as number)
) {
sanitized.billingCompanyId = null;
}
// Nullify billingAddressId if the address doesn't exist
if (
sanitized.billingAddressId != null &&
!context.companyAddressIds.has(sanitized.billingAddressId as number)
) {
sanitized.billingAddressId = null;
}
}
if (step.targetModel === "opportunity") {
if (sanitized.typeId == null && context.defaultOpportunityTypeId != null) {
sanitized.typeId = context.defaultOpportunityTypeId;
}
if (
sanitized.statusId != null &&
!context.opportunityStatusIds.has(sanitized.statusId as number)
) {
sanitized.statusId = null;
}
// Nullify companyId if the company doesn't exist
if (
sanitized.companyId != null &&
!context.companyIds.has(sanitized.companyId as number)
) {
sanitized.companyId = null;
}
// Nullify contactId if the contact doesn't exist
if (
sanitized.contactId != null &&
!context.contactIds.has(sanitized.contactId as number)
) {
sanitized.contactId = null;
}
// Nullify siteId (companyAddressId) if the address doesn't exist
if (
sanitized.siteId != null &&
!context.companyAddressIds.has(sanitized.siteId as number)
) {
sanitized.siteId = null;
}
// Nullify stageId if the stage doesn't exist
if (
sanitized.stageId != null &&
!context.opportunityStageIds.has(sanitized.stageId as number)
) {
sanitized.stageId = null;
}
// Nullify locationId if the corporate location doesn't exist
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
// Nullify taxCodeId if the tax code hasn't synced yet
if (
sanitized.taxCodeId != null &&
!context.taxCodeIds.has(sanitized.taxCodeId as number)
) {
sanitized.taxCodeId = null;
}
}
if (step.targetModel === "schedule") {
// Nullify statusId if the referenced ScheduleStatus hasn't synced yet
if (
sanitized.statusId != null &&
!context.scheduleStatusIds.has(sanitized.statusId as number)
) {
sanitized.statusId = null;
}
// Nullify typeId if the referenced ScheduleType hasn't synced yet
if (
sanitized.typeId != null &&
!context.scheduleTypeIds.has(sanitized.typeId as number)
) {
sanitized.typeId = null;
}
// Nullify scheduleSpanId if the referenced ScheduleSpan hasn't synced yet
if (
sanitized.scheduleSpanId != null &&
!context.scheduleSpanIds.has(sanitized.scheduleSpanId as number)
) {
sanitized.scheduleSpanId = null;
}
}
return sanitized;
};
const asRecord = (value: unknown): Record<string, unknown> | null => {
if (typeof value !== "object" || value == null) {
return null;
}
return value as Record<string, unknown>;
};
const parseFieldsFromErrorMessage = (message: string): string[] => {
const fields = new Set<string>();
// Handles patterns like (`"cwIdentifier"`) and (`login`)
for (const match of message.matchAll(/`\"([^\"]+)\"`|`([^`]+)`/g)) {
const candidate = (match[1] ?? match[2] ?? "").trim();
if (!candidate) continue;
// Skip common non-field tokens that can appear in error messages.
if (candidate === "targetDelegate.upsert()" || candidate === "invocation") {
continue;
}
fields.add(candidate);
}
return [...fields];
};
const getUniqueConstraintFields = (error: unknown): string[] => {
const errorRecord = asRecord(error);
if (!errorRecord || errorRecord.code !== "P2002") {
return [];
}
const meta = asRecord(errorRecord.meta);
if (!meta) {
return [];
}
const target = meta.target;
if (Array.isArray(target)) {
return target.filter((field): field is string => typeof field === "string");
}
if (typeof target === "string") {
return [target];
}
if (typeof errorRecord.message === "string") {
return parseFieldsFromErrorMessage(errorRecord.message);
}
return [];
};
const formatValue = (value: unknown): string => {
if (value instanceof Date) {
return value.toISOString();
}
if (typeof value === "string") {
return JSON.stringify(value);
}
if (
typeof value === "number" ||
typeof value === "boolean" ||
value == null
) {
return String(value);
}
try {
return JSON.stringify(value);
} catch {
return "[unserializable]";
}
};
const formatUniqueConstraintError = (
error: unknown,
data: Row | null
): string => {
const fields = getUniqueConstraintFields(error);
if (fields.length === 0) {
return "Unique constraint failed.";
}
const fieldValues = fields.map((field) => {
const value = data ? data[field] : undefined;
return `${field}=${formatValue(value)}`;
});
return `Unique constraint failed on field(s): ${fields.join(
", "
)}. Values: ${fieldValues.join(", ")}`;
};
const toComparableKey = (value: unknown): string | null => {
if (value == null) {
return null;
}
if (typeof value === "number") {
if (Number.isNaN(value)) {
return null;
}
return `number:${value}`;
}
if (typeof value === "string") {
return `string:${value}`;
}
if (typeof value === "bigint") {
return `bigint:${value.toString()}`;
}
if (typeof value === "boolean") {
return `boolean:${value}`;
}
return null;
};
const reconcileStepDeletes = async (
cwPrisma: CwPrismaClient,
apiPrisma: ApiPrismaClient,
step: Step
): Promise<DeleteResult> => {
const sourceDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function }>
)[step.sourceModel];
const targetDelegate = (
apiPrisma as unknown as Record<
string,
{ findMany: Function; deleteMany: Function; delete: Function }
>
)[step.targetModel];
if (!sourceDelegate) {
throw new Error(`CW delegate not found: ${step.sourceModel}`);
}
if (!targetDelegate) {
throw new Error(`API delegate not found: ${step.targetModel}`);
}
const sourceWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ??
undefined;
const sourceRows = (await sourceDelegate.findMany({
...(sourceWhere ? { where: sourceWhere } : {}),
select: {
[step.sourceIdField]: true,
},
})) as Row[];
const sourceKeys = new Set<string>();
for (const sourceRow of sourceRows) {
const sourceKey = toComparableKey(sourceRow[step.sourceIdField]);
if (sourceKey) {
sourceKeys.add(sourceKey);
}
}
const targetRows = (await targetDelegate.findMany({
...(step.targetDeleteWhere ? { where: step.targetDeleteWhere } : {}),
select: {
[step.uniqueField]: true,
},
})) as Row[];
const staleValues: Array<string | number | bigint | boolean> = [];
for (const targetRow of targetRows) {
const value = targetRow[step.uniqueField];
const targetKey = toComparableKey(value);
if (!targetKey) {
continue;
}
if (!sourceKeys.has(targetKey)) {
staleValues.push(value as string | number | bigint | boolean);
}
}
if (staleValues.length === 0) {
return { deleted: 0, failed: 0 };
}
const chunkSize = 500;
let deleted = 0;
let failed = 0;
let sampleErrorsPrinted = 0;
for (let index = 0; index < staleValues.length; index += chunkSize) {
const chunk = staleValues.slice(index, index + chunkSize);
try {
const deleteManyResult = (await targetDelegate.deleteMany({
where: {
...(step.targetDeleteWhere ?? {}),
[step.uniqueField]: { in: chunk },
},
})) as unknown;
const deletedCount = asRecord(deleteManyResult)?.count as
| number
| undefined;
deleted += typeof deletedCount === "number" ? deletedCount : chunk.length;
continue;
} catch {
// Fall back to row-by-row deletes so one FK violation does not block all deletions.
}
for (const value of chunk) {
try {
await targetDelegate.delete({
where: {
[step.uniqueField]: value,
},
});
deleted += 1;
} catch (error) {
failed += 1;
if (sampleErrorsPrinted < 5) {
sampleErrorsPrinted += 1;
const message =
error instanceof Error ? error.message : "Unknown delete error";
console.error(
`Delete reconcile failed in ${step.name} for ${
step.uniqueField
}=${formatValue(value)}:`,
message
);
}
}
}
}
if (failed > sampleErrorsPrinted) {
console.error(
`${step.name}: suppressed ${
failed - sampleErrorsPrinted
} additional delete errors`
);
}
return { deleted, failed };
};
type SmartSyncDecision =
| { mode: "full"; differences: SmartSyncDifference[] }
| {
mode: "incremental";
sourceIds: number[];
differences: SmartSyncDifference[];
};
type SmartSyncDifference = {
sourceId: number;
reason: "missing-in-api" | "cw-newer";
cwUpdatedAt: Date;
apiUpdatedAt: Date | null;
};
const logAllSmartSyncDifferences = (
step: Step,
differences: SmartSyncDifference[]
): void => {
if (differences.length === 0) {
console.log(
` [smart-sync][details] ${step.name}: no differences in sampled records`
);
return;
}
console.log(
` [smart-sync][details] ${step.name}: logging all ${differences.length} differences`
);
for (const diff of differences) {
const apiUpdated = diff.apiUpdatedAt
? diff.apiUpdatedAt.toISOString()
: "null";
console.log(
` [diff] sourceModel=${step.sourceModel} targetModel=${
step.targetModel
} id=${diff.sourceId} reason=${
diff.reason
} cwUpdatedAt=${diff.cwUpdatedAt.toISOString()} apiUpdatedAt=${apiUpdated}`
);
}
};
const computeSmartSyncDecision = async (
cwPrisma: CwPrismaClient,
apiPrisma: ApiPrismaClient,
step: Step,
forceIncremental = false
): Promise<SmartSyncDecision> => {
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
const apiDelegate = (
apiPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.targetModel];
if (!cwDelegate || !apiDelegate) {
return { mode: "full", differences: [] };
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
// Count-based check: force a full sync when the two databases have diverged
// significantly in either direction.
// CW >> API → rows are missing from API (old records never in recency window)
// API >> CW → stale rows exist in API (need delete reconciliation via full sync)
if (!forceIncremental) {
const [cwCount, apiCount] = await Promise.all([
(cwDelegate as unknown as { count: Function }).count({ where: existingWhere }),
(apiDelegate as unknown as { count: Function }).count(),
]);
const gap = cwCount - apiCount; // positive = API missing rows, negative = API has extra rows
const threshold = Math.max(50, Math.floor(Math.max(cwCount, apiCount) * 0.05));
if (Math.abs(gap) > threshold) {
const direction = gap > 0 ? "api-missing-rows" : "api-has-stale-rows";
console.log(
` [smart-sync] ${step.name}: count gap detected (cw=${cwCount}, api=${apiCount}, gap=${gap}, direction=${direction}) — forcing full sync`
);
return { mode: "full", differences: [] };
}
}
// Fetch top 1000 CW records: only id + updatedAt
const cwSample = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: existingWhere,
orderBy: { [step.sourceUpdatedField]: "desc" },
take: 1000,
})) as Row[];
if (cwSample.length === 0) {
return { mode: "incremental", sourceIds: [], differences: [] };
}
const cwIds = cwSample.map((r) => r[step.sourceIdField] as number);
// If the API model has no timestamp field, skip incremental comparison and do a full sync.
const apiUpdatedField = step.targetUpdatedField === undefined ? "updatedAt" : step.targetUpdatedField;
if (apiUpdatedField === null) {
return { mode: "full", differences: [] };
}
// Fetch the corresponding API records for comparison
const apiSample = (await apiDelegate.findMany({
select: {
[step.uniqueField]: true,
[apiUpdatedField]: true,
},
where: { [step.uniqueField]: { in: cwIds } },
})) as Row[];
const apiMap = new Map<number, Date>();
for (const row of apiSample) {
const id = row[step.uniqueField] as number;
apiMap.set(id, (row[apiUpdatedField] as Date | null) ?? new Date(0));
}
let differences = 0;
const differenceRows: SmartSyncDifference[] = [];
for (const cwRow of cwSample) {
const cwId = cwRow[step.sourceIdField] as number;
const cwUpdated =
(cwRow[step.sourceUpdatedField] as Date | null) ?? new Date(0);
if (!apiMap.has(cwId)) {
differences++;
differenceRows.push({
sourceId: cwId,
reason: "missing-in-api",
cwUpdatedAt: cwUpdated,
apiUpdatedAt: null,
});
} else {
const apiUpdated = apiMap.get(cwId)!;
if (cwUpdated.getTime() > apiUpdated.getTime()) {
differences++;
differenceRows.push({
sourceId: cwId,
reason: "cw-newer",
cwUpdatedAt: cwUpdated,
apiUpdatedAt: apiUpdated,
});
}
}
}
console.log(
` [smart-sync] ${step.name}: ${differences} differences in top 1000`
);
if (forceIncremental) {
return {
mode: "incremental",
sourceIds: differenceRows.map((d) => d.sourceId),
differences: differenceRows,
};
}
if (differences > 100) {
return { mode: "full", differences: differenceRows };
}
return {
mode: "incremental",
sourceIds: differenceRows.map((d) => d.sourceId),
differences: differenceRows,
};
};
const syncStep = async (
cwPrisma: CwPrismaClient,
apiPrisma: ApiPrismaClient,
context: TranslationContext,
step: Step,
sourceIdsFilter?: number[]
): Promise<StepResult> => {
const sourceDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function }>
)[step.sourceModel];
const targetDelegate = (
apiPrisma as unknown as Record<string, { upsert: Function }>
)[step.targetModel];
if (!sourceDelegate) {
throw new Error(`CW delegate not found: ${step.sourceModel}`);
}
if (!targetDelegate) {
throw new Error(`API delegate not found: ${step.targetModel}`);
}
if (sourceIdsFilter !== undefined && sourceIdsFilter.length === 0) {
return { insertedOrUpdated: 0, skipped: 0, failed: 0 };
}
const baseArgs: Record<string, unknown> = step.sourceArgs ?? {};
let insertedOrUpdated = 0;
let skipped = 0;
let failed = 0;
let sampleErrorsPrinted = 0;
const processRow = async (row: Row) => {
let translatedData: Row | null = null;
try {
const translated = translateRow(row, step.translation, context);
const userSanitized = sanitizeUserForeignKeys(
translated,
context,
step.targetModel
);
const data = sanitizeModelData(userSanitized, step, context);
translatedData = data;
const uniqueValue = data[step.uniqueField];
if (
uniqueValue === undefined ||
uniqueValue === null ||
uniqueValue === ""
) {
skipped += 1;
return;
}
await targetDelegate.upsert({
where: {
[step.uniqueField]: uniqueValue,
},
create: data,
update: data,
});
insertedOrUpdated += 1;
} catch (error) {
if (error instanceof SkipRowError) {
skipped += 1;
return;
}
failed += 1;
if (sampleErrorsPrinted < 5) {
sampleErrorsPrinted += 1;
const uniqueFields = getUniqueConstraintFields(error);
const message =
uniqueFields.length > 0
? formatUniqueConstraintError(error, translatedData)
: error instanceof Error
? error.message
: "Unknown row sync error";
console.error(
`Failed row in ${step.name} (source ${step.sourceModel} -> target ${step.targetModel}):`,
message
);
}
}
};
if (sourceIdsFilter !== undefined) {
// Incremental sync: fetch only the specific IDs
const existingWhere = (baseArgs as Record<string, unknown>).where ?? {};
const findManyArgs = {
...baseArgs,
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceIdField]: { in: sourceIdsFilter },
},
};
const rows = (await sourceDelegate.findMany(findManyArgs)) as Row[];
for (const row of rows) {
await processRow(row);
}
} else {
// Full sync: paginate through all records to avoid loading 100K+ rows at once
const FULL_SYNC_PAGE_SIZE = 5000;
let skip = 0;
while (true) {
const pageArgs: Record<string, unknown> = {
...baseArgs,
orderBy: { [step.sourceIdField]: "asc" as const },
take: FULL_SYNC_PAGE_SIZE,
skip,
};
const pageRows = (await sourceDelegate.findMany(pageArgs)) as Row[];
for (const row of pageRows) {
await processRow(row);
}
if (pageRows.length < FULL_SYNC_PAGE_SIZE) break;
skip += FULL_SYNC_PAGE_SIZE;
}
}
if (failed > sampleErrorsPrinted) {
console.error(
`${step.name}: suppressed ${
failed - sampleErrorsPrinted
} additional row errors`
);
}
return { insertedOrUpdated, skipped, failed };
};
export const executeFullDalpuriSync = async (options?: {
forceIncremental?: boolean;
logAllDifferences?: boolean;
reconcileDeletes?: boolean;
jobRunId?: string;
timeoutMs?: number;
}): Promise<void> => {
const forceIncremental = options?.forceIncremental === true;
const logAllDifferences = options?.logAllDifferences ?? !forceIncremental;
const reconcileDeletes = options?.reconcileDeletes ?? true;
const jobRunId = options?.jobRunId;
const timeoutMs = options?.timeoutMs ?? 30 * 60 * 1000; // 30 minutes default
const cwDatabaseUrl = process.env.CW_DATABASE_URL || process.env.DATABASE_URL;
const apiDatabaseUrl = resolveApiDatabaseUrl();
if (!cwDatabaseUrl) {
throw new Error(
"Missing CW DB connection string. Set CW_DATABASE_URL or DATABASE_URL."
);
}
if (!apiDatabaseUrl) {
throw new Error(
"Missing API DB connection string. Set API_DATABASE_URL/OPTIMA_API_DATABASE_URL or ensure ../api/.env has DATABASE_URL."
);
}
const cwAdapter = new PrismaMssql(cwDatabaseUrl);
const apiAdapter = new PrismaPg({ connectionString: apiDatabaseUrl });
const cwPrisma = new CwPrismaClient({ adapter: cwAdapter });
const apiPrisma = new ApiPrismaClient({ adapter: apiAdapter });
const context = createTranslationContext();
// Helper to update job status in the database
const updateJobStatus = async (
status: "RUNNING" | "COMPLETED" | "FAILED" | "TIMED_OUT",
errorSummary?: string
) => {
if (!jobRunId) return;
try {
await apiPrisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status,
...(status === "RUNNING" ? { startedAt: new Date() } : {}),
...(status === "COMPLETED" || status === "FAILED" || status === "TIMED_OUT"
? { completedAt: new Date() }
: {}),
...(errorSummary ? { errorSummary } : {}),
},
});
} catch (err) {
console.error("[sync] Failed to update job status:", err);
}
};
// Helper to write a step log
const writeStepLog = async (
tableName: string,
syncMode: string,
result: StepResult,
deleteResult: DeleteResult,
durationMs: number,
sampleErrors: string[] = []
) => {
if (!jobRunId) return;
try {
await apiPrisma.syncStepLog.create({
data: {
syncJobRunId: jobRunId,
tableName,
syncMode,
recordsProcessed: result.insertedOrUpdated + result.skipped + result.failed,
recordsInserted: result.insertedOrUpdated,
recordsSkipped: result.skipped,
recordsFailed: result.failed,
recordsDeleted: deleteResult.deleted,
sampleErrors: sampleErrors,
durationMs,
},
});
} catch (err) {
console.error("[sync] Failed to write step log:", err);
}
};
// Timeout tracking
const syncStartTime = Date.now();
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Users",
sourceModel: "member",
targetModel: "user",
translation: userTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
targetDeleteWhere: {
cwMemberId: {
not: null,
},
},
},
{
name: "Companies",
sourceModel: "company",
targetModel: "company",
translation: companyTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "companyRecId",
sourceUpdatedField: "lastUpdate",
},
{
name: "Company Addresses",
sourceModel: "companyAddress",
targetModel: "companyAddress",
translation: companyAddressTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "companyAddressRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Contacts",
sourceModel: "contact",
targetModel: "contact",
translation: contactTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "contactRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Corporate Locations",
sourceModel: "ownerLevel",
targetModel: "corporateLocation",
translation: corporateLocationTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "ownerLevelRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Internal Departments",
sourceModel: "department",
targetModel: "internalDepartment",
translation: internalDepartmentTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "departmentRecId",
sourceUpdatedField: "lastUpdatedUTC",
},
{
name: "Catalog Item Types",
sourceModel: "productType",
targetModel: "catalogItemType",
translation: catalogItemTypeTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "typeRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Catalog Categories",
sourceModel: "productCategory",
targetModel: "catalogCategory",
translation: catalogCategoryTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "categoryRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Catalog Subcategories",
sourceModel: "productSubcategory",
targetModel: "catalogSubcategory",
translation: catalogSubcategoryTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "subcategoryRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Catalog Manufacturers",
sourceModel: "manufacturer",
targetModel: "catalogManufacturer",
translation: catalogManufacturerTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "manufacturerRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Warehouse Bins",
sourceModel: "warehouseBin",
targetModel: "warehouseBin",
translation: warehouseBinTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "warehouseBinRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Catalog Items",
sourceModel: "productCatalog",
targetModel: "catalogItem",
translation: catalogItemTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "catalogRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Product Inventory",
sourceModel: "productInventory",
targetModel: "productInventory",
translation: productInventoryTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "inventoryRecId",
sourceUpdatedField: "lastUpdate",
},
{
name: "Service Ticket Types",
sourceModel: "srType",
targetModel: "serviceTicketType",
translation: serviceTicketTypeTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srTypeRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Locations",
sourceModel: "srLocation",
targetModel: "serviceTicketLocation",
translation:
serviceTicketLocationTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srLocationRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Sources",
sourceModel: "srSource",
targetModel: "serviceTicketSource",
translation: serviceTicketSourceTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srSourceRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Impacts",
sourceModel: "srImpact",
targetModel: "serviceTicketImpact",
translation: serviceTicketImpactTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srImpactRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Severities",
sourceModel: "srSeverity",
targetModel: "serviceTicketSeverity",
translation:
serviceTicketSeverityTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srSeverityRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Priorities",
sourceModel: "srUrgency",
targetModel: "serviceTicketPriority",
translation:
serviceTicketPriorityTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srUrgencyRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Service Ticket Boards",
sourceModel: "srBoard",
targetModel: "serviceTicketBoard",
translation: serviceTicketBoardTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srBoardRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Tax Codes",
sourceModel: "taxCode",
targetModel: "taxCode",
translation: taxCodeTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "taxCodeRecId",
sourceUpdatedField: "lastUpdatedUtc",
sourceArgs: {
include: {
levels: {
where: { taxXref: { not: null } },
},
},
},
},
{
name: "Opportunity Stages",
sourceModel: "soPipeline",
targetModel: "opportunityStage",
translation: opportunityStageTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "soPipelineRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Opportunity Types",
sourceModel: "soType",
targetModel: "opportunityType",
translation: opportunityTypeTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "soTypeRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Opportunity Statuses",
sourceModel: "soOppStatus",
targetModel: "opportunityStatus",
translation: opportunityStatusTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "soOppStatusRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Opportunities",
sourceModel: "opportunity",
targetModel: "opportunity",
translation: opportunityTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "opportunityRecId",
sourceUpdatedField: "lastUpdatedUtc",
sourceArgs: {
include: {
members: {
select: {
memberRecId: true,
primarySalesFlag: true,
secondarySalesFlag: true,
},
},
soOppStatus: {
select: {
closedFlag: true,
},
},
},
},
},
{
name: "Service Tickets",
sourceModel: "srService",
targetModel: "serviceTicket",
translation: serviceTicketTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "srServiceRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Product Data",
sourceModel: "iV_Product",
targetModel: "productData",
translation: productDataTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "ivProductRecId",
sourceUpdatedField: "lastUpdatedUTC",
},
{
name: "Service Ticket Notes",
sourceModel: "ticketNote",
targetModel: "serviceTicketNote",
translation: serviceTicketNoteTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "ticketNoteRecId",
sourceUpdatedField: "lastUpdatedUtc",
sourceArgs: {
orderBy: {
dateCreated: "asc",
},
},
},
{
name: "Schedule Statuses",
sourceModel: "scheduleStatus",
targetModel: "scheduleStatus",
translation: scheduleStatusTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "scheduleStatusRecId",
sourceUpdatedField: "lastUpdateUtc",
},
{
name: "Schedule Types",
sourceModel: "scheduleType",
targetModel: "scheduleType",
translation: scheduleTypeTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "scheduleTypeRecId",
sourceUpdatedField: "lastUpdateUtc",
},
{
name: "Schedule Spans",
sourceModel: "scheduleSpan",
targetModel: "scheduleSpan",
translation: scheduleSpanTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "scheduleSpanRecId",
sourceUpdatedField: "scheduleSpanRecId",
targetUpdatedField: null,
},
{
name: "Schedules",
sourceModel: "schedule",
targetModel: "schedule",
translation: scheduleTranslation as unknown as AnyTranslation,
uniqueField: "id",
sourceIdField: "scheduleRecId",
sourceUpdatedField: "lastUpdateUtc",
},
];
try {
await updateJobStatus("RUNNING");
try {
await refreshContextFromApi(apiPrisma, context);
await refreshCustomFieldContextFromCw(cwPrisma, context);
} catch (contextError) {
const errorMsg = contextError instanceof Error ? contextError.message : String(contextError);
console.error("[sync] Context refresh failed — aborting sync:", errorMsg);
await updateJobStatus("FAILED", `Context refresh failed: ${errorMsg}`);
return;
}
const summary: Array<{
step: string;
result: StepResult;
deleteResult: DeleteResult;
}> = [];
const summaryByStep = new Map<
string,
{
step: string;
result: StepResult;
deleteResult: DeleteResult;
}
>();
for (const step of steps) {
// Check timeout between steps
if (isTimedOut()) {
console.error(`[sync] Timeout exceeded (${timeoutMs}ms) — aborting at step ${step.name}`);
await updateJobStatus("TIMED_OUT", `Timed out at step: ${step.name}`);
return;
}
// Refresh context before tables that depend heavily on cross-table lookups.
if (
step.targetModel === "user" ||
step.targetModel === "contact" ||
step.targetModel === "serviceTicketBoard" ||
step.targetModel === "serviceTicket" ||
step.targetModel === "opportunity" ||
step.targetModel === "productData" ||
step.targetModel === "serviceTicketNote" ||
step.targetModel === "schedule"
) {
try {
await refreshContextFromApi(apiPrisma, context);
if (
step.targetModel === "opportunity" ||
step.targetModel === "productData"
) {
await refreshCustomFieldContextFromCw(cwPrisma, context);
}
} catch (contextError) {
const errorMsg = contextError instanceof Error ? contextError.message : String(contextError);
console.error(`[sync] Context refresh failed before ${step.name} — aborting:`, errorMsg);
await updateJobStatus("FAILED", `Context refresh failed before ${step.name}: ${errorMsg}`);
return;
}
}
const stepStart = Date.now();
console.log(`Starting ${step.name}...`);
const decision = await computeSmartSyncDecision(
cwPrisma,
apiPrisma,
step,
forceIncremental
);
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
cwPrisma,
step,
forceIncremental
);
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: criticalWatermarkDecision ?? decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter =
effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
effectiveDecision.mode
}${
effectiveDecision.mode === "incremental"
? ` (${effectiveDecision.sourceIds.length} ids)`
: ""
}`
);
if (logAllDifferences) {
logAllSmartSyncDifferences(step, effectiveDecision.differences);
}
const result = await syncStep(
cwPrisma,
apiPrisma,
context,
step,
sourceIdsFilter
);
const summaryItem = {
step: step.name,
result,
deleteResult: { deleted: 0, failed: 0 },
};
summary.push(summaryItem);
summaryByStep.set(step.name, summaryItem);
console.log(
`${step.name}: upserted=${result.insertedOrUpdated} skipped=${result.skipped} failed=${result.failed}`
);
await writeStepLog(
step.name,
effectiveDecision.mode,
result,
{ deleted: 0, failed: 0 },
Date.now() - stepStart
);
}
if (reconcileDeletes) {
// Check timeout before delete reconciliation
if (isTimedOut()) {
console.error(`[sync] Timeout exceeded — skipping delete reconciliation`);
await updateJobStatus("TIMED_OUT", "Timed out before delete reconciliation");
return;
}
const deleteSteps = forceIncremental
? [steps[incrementalDeleteStepIndex % steps.length]]
: [...steps].reverse();
if (forceIncremental) {
const selected = deleteSteps[0];
console.log(
`[delete-reconcile] incremental sweep: ${selected.name} (${
incrementalDeleteStepIndex + 1
}/${steps.length})`
);
incrementalDeleteStepIndex =
(incrementalDeleteStepIndex + 1) % steps.length;
}
for (const step of deleteSteps) {
if (isTimedOut()) {
console.error(`[sync] Timeout exceeded during delete reconciliation at ${step.name}`);
await updateJobStatus("TIMED_OUT", `Timed out during delete reconciliation: ${step.name}`);
return;
}
console.log(`Reconciling deletes for ${step.name}...`);
const deleteResult = await reconcileStepDeletes(
cwPrisma,
apiPrisma,
step
);
const summaryItem = summaryByStep.get(step.name);
if (summaryItem) {
summaryItem.deleteResult = deleteResult;
}
console.log(
`${step.name}: deleted=${deleteResult.deleted} deleteFailed=${deleteResult.failed}`
);
}
}
console.log("\nSync complete.\n");
for (const item of summary) {
console.log(
`${item.step}: upserted=${item.result.insertedOrUpdated}, skipped=${item.result.skipped}, failed=${item.result.failed}, deleted=${item.deleteResult.deleted}, deleteFailed=${item.deleteResult.failed}`
);
}
await updateJobStatus("COMPLETED");
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
console.error("[sync] Unhandled sync error:", errorMsg);
// Try to write the error to the job run
if (jobRunId) {
try {
await apiPrisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorMsg.slice(0, 2000),
},
});
} catch {
// Best-effort status update
}
}
throw error;
} finally {
await cwPrisma.$disconnect();
await apiPrisma.$disconnect();
}
};
export const executeForcedIncrementalDalpuriSync = async (options?: {
jobRunId?: string;
}): Promise<void> => {
return executeFullDalpuriSync({
forceIncremental: true,
logAllDifferences: false,
jobRunId: options?.jobRunId,
});
};
if (import.meta.main) {
executeFullDalpuriSync()
.then(() => {
process.exit(0);
})
.catch((error) => {
console.error("CW -> API sync failed:", error);
process.exit(1);
});
}