e0d575454e
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing Users first caused all 140 User upserts to fail since the CwMember table was empty. This cascade failure then caused all Opportunity upserts to fail because Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier. Fix: reorder steps so CW Members syncs first, then Users.
1900 lines
53 KiB
TypeScript
1900 lines
53 KiB
TypeScript
import { PrismaMssql } from "@prisma/adapter-mssql";
|
|
import { PrismaPg } from "@prisma/adapter-pg";
|
|
import { readFileSync } from "node:fs";
|
|
import { resolve } from "node:path";
|
|
|
|
import { PrismaClient as CwPrismaClient } from "../generated/prisma/client";
|
|
import { PrismaClient as ApiPrismaClient } from "../../api/generated/prisma/client";
|
|
|
|
import {
|
|
catalogCategoryTranslation,
|
|
catalogItemTranslation,
|
|
catalogItemTypeTranslation,
|
|
catalogManufacturerTranslation,
|
|
catalogSubcategoryTranslation,
|
|
companyAddressTranslation,
|
|
companyTranslation,
|
|
contactTranslation,
|
|
corporateLocationTranslation,
|
|
createTranslationContext,
|
|
cwMemberTranslation,
|
|
internalDepartmentTranslation,
|
|
opportunityStageTranslation,
|
|
opportunityStatusTranslation,
|
|
opportunityTranslation,
|
|
opportunityTypeTranslation,
|
|
productDataTranslation,
|
|
productInventoryTranslation,
|
|
serviceTicketBoardTranslation,
|
|
serviceTicketImpactTranslation,
|
|
serviceTicketLocationTranslation,
|
|
serviceTicketNoteTranslation,
|
|
serviceTicketPriorityTranslation,
|
|
serviceTicketSeverityTranslation,
|
|
serviceTicketSourceTranslation,
|
|
serviceTicketTranslation,
|
|
serviceTicketTypeTranslation,
|
|
scheduleStatusTranslation,
|
|
scheduleTypeTranslation,
|
|
scheduleSpanTranslation,
|
|
scheduleTranslation,
|
|
taxCodeTranslation,
|
|
userTranslation,
|
|
warehouseBinTranslation,
|
|
type TranslationContext,
|
|
} from "./index";
|
|
import { Translation, SkipRowError } from "./translations/types";
|
|
|
|
type Row = Record<string, unknown>;
|
|
type AnyTranslation = Translation<Row, Row, TranslationContext>;
|
|
|
|
type Step = {
|
|
name: string;
|
|
sourceModel: string;
|
|
targetModel: string;
|
|
translation: AnyTranslation;
|
|
uniqueField: string;
|
|
sourceIdField: string;
|
|
sourceUpdatedField: string;
|
|
sourceArgs?: Record<string, unknown>;
|
|
targetDeleteWhere?: Record<string, unknown>;
|
|
/** API-side field used for timestamp comparison in smart-sync. Defaults to "updatedAt". Set to null for tables without timestamps. */
|
|
targetUpdatedField?: string | null;
|
|
};
|
|
|
|
type StepResult = {
|
|
insertedOrUpdated: number;
|
|
skipped: number;
|
|
failed: number;
|
|
};
|
|
|
|
type DeleteResult = {
|
|
deleted: number;
|
|
failed: number;
|
|
};
|
|
|
|
let incrementalDeleteStepIndex = 0;
|
|
|
|
const parseEnvFile = (path: string): Record<string, string> => {
|
|
const envData = readFileSync(path, "utf8");
|
|
const out: Record<string, string> = {};
|
|
|
|
for (const rawLine of envData.split(/\r?\n/)) {
|
|
const line = rawLine.trim();
|
|
if (!line || line.startsWith("#")) continue;
|
|
|
|
const index = line.indexOf("=");
|
|
if (index <= 0) continue;
|
|
|
|
const key = line.slice(0, index).trim();
|
|
let value = line.slice(index + 1).trim();
|
|
|
|
if (
|
|
(value.startsWith('"') && value.endsWith('"')) ||
|
|
(value.startsWith("'") && value.endsWith("'"))
|
|
) {
|
|
value = value.slice(1, -1);
|
|
}
|
|
|
|
out[key] = value;
|
|
}
|
|
|
|
return out;
|
|
};
|
|
|
|
const resolveApiDatabaseUrl = (): string => {
|
|
if (process.env.API_DATABASE_URL) return process.env.API_DATABASE_URL;
|
|
if (process.env.OPTIMA_API_DATABASE_URL)
|
|
return process.env.OPTIMA_API_DATABASE_URL;
|
|
|
|
const candidates = [
|
|
resolve(import.meta.dir, "../../api/.env"),
|
|
resolve(process.cwd(), "../api/.env"),
|
|
];
|
|
|
|
for (const apiEnvPath of candidates) {
|
|
try {
|
|
const apiEnv = parseEnvFile(apiEnvPath);
|
|
if (apiEnv.DATABASE_URL) {
|
|
return apiEnv.DATABASE_URL;
|
|
}
|
|
} catch {
|
|
// Try next path candidate.
|
|
}
|
|
}
|
|
|
|
return "";
|
|
};
|
|
|
|
const parseJsonMapFromEnv = (envKey: string): Map<number, string> => {
|
|
const raw = process.env[envKey];
|
|
if (!raw) return new Map();
|
|
|
|
try {
|
|
const parsed = JSON.parse(raw) as Record<string, string>;
|
|
const entries = Object.entries(parsed)
|
|
.map(([key, value]) => [Number.parseInt(key, 10), value] as const)
|
|
.filter(
|
|
([key, value]) => Number.isInteger(key) && typeof value === "string"
|
|
);
|
|
|
|
return new Map(entries);
|
|
} catch (error) {
|
|
console.warn(`Could not parse ${envKey} as JSON map. Ignoring.`, error);
|
|
return new Map();
|
|
}
|
|
};
|
|
|
|
const normalizeData = (data: Row): Row => {
|
|
const normalized: Row = {};
|
|
for (const [key, value] of Object.entries(data)) {
|
|
if (value !== undefined) {
|
|
normalized[key] = value;
|
|
}
|
|
}
|
|
return normalized;
|
|
};
|
|
|
|
const translateRow = (
|
|
row: Row,
|
|
translation: AnyTranslation,
|
|
context: TranslationContext
|
|
): Row => {
|
|
const out: Row = {};
|
|
|
|
for (const entry of translation.values) {
|
|
const fromKey = entry.from as string;
|
|
const toKey = entry.to as string;
|
|
const input = row[fromKey];
|
|
|
|
if (input === undefined) {
|
|
continue;
|
|
}
|
|
|
|
const output = entry.process
|
|
? entry.process(input as never, context, row)
|
|
: input;
|
|
out[toKey] = output as unknown;
|
|
}
|
|
|
|
return normalizeData(out);
|
|
};
|
|
|
|
const refreshContextFromApi = async (
|
|
apiPrisma: ApiPrismaClient,
|
|
context: TranslationContext
|
|
): Promise<void> => {
|
|
context.userIds.clear();
|
|
context.serviceTicketIds.clear();
|
|
context.opportunityIds.clear();
|
|
context.catalogItemIds.clear();
|
|
context.corporateLocationIds.clear();
|
|
context.companyIds.clear();
|
|
context.companyAddressIds.clear();
|
|
context.contactIds.clear();
|
|
context.opportunityStageIds.clear();
|
|
context.usersByMemberRecId.clear();
|
|
context.userIdentifiersByMemberRecId.clear();
|
|
context.usersByIdentifier.clear();
|
|
context.serviceTicketBoardUidsById.clear();
|
|
context.opportunityStatusIds.clear();
|
|
context.scheduleStatusIds.clear();
|
|
context.scheduleTypeIds.clear();
|
|
context.scheduleSpanIds.clear();
|
|
context.taxCodeIds.clear();
|
|
|
|
const [
|
|
users,
|
|
boards,
|
|
serviceTickets,
|
|
opportunities,
|
|
catalogItems,
|
|
locations,
|
|
opportunityStatuses,
|
|
companies,
|
|
companyAddresses,
|
|
contacts,
|
|
opportunityStages,
|
|
scheduleStatuses,
|
|
scheduleTypes,
|
|
scheduleSpans,
|
|
taxCodes,
|
|
] = await Promise.all([
|
|
apiPrisma.user.findMany({
|
|
select: {
|
|
id: true,
|
|
cwMemberId: true,
|
|
cwIdentifier: true,
|
|
},
|
|
}),
|
|
apiPrisma.serviceTicketBoard.findMany({
|
|
select: {
|
|
id: true,
|
|
uid: true,
|
|
},
|
|
}),
|
|
apiPrisma.serviceTicket.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.opportunity.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.catalogItem.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.corporateLocation.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.opportunityStatus.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.company.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.companyAddress.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.contact.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.opportunityStage.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.scheduleStatus.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.scheduleType.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.scheduleSpan.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
apiPrisma.taxCode.findMany({
|
|
select: {
|
|
id: true,
|
|
},
|
|
}),
|
|
]);
|
|
|
|
const defaultOpportunityType = await apiPrisma.opportunityType.findFirst({
|
|
orderBy: { id: "asc" },
|
|
select: { id: true },
|
|
});
|
|
|
|
for (const user of users) {
|
|
context.userIds.add(user.id);
|
|
|
|
if (user.cwMemberId != null) {
|
|
context.usersByMemberRecId.set(user.cwMemberId, user.id);
|
|
if (user.cwIdentifier) {
|
|
context.userIdentifiersByMemberRecId.set(
|
|
user.cwMemberId,
|
|
user.cwIdentifier
|
|
);
|
|
}
|
|
}
|
|
|
|
if (user.cwIdentifier) {
|
|
context.usersByIdentifier.set(user.cwIdentifier, user.id);
|
|
}
|
|
}
|
|
|
|
const cwMembers = await apiPrisma.cwMember.findMany({
|
|
select: { cwMemberId: true, identifier: true },
|
|
});
|
|
for (const member of cwMembers) {
|
|
if (
|
|
member.cwMemberId != null &&
|
|
member.identifier &&
|
|
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
|
|
) {
|
|
context.userIdentifiersByMemberRecId.set(
|
|
member.cwMemberId,
|
|
member.identifier
|
|
);
|
|
}
|
|
}
|
|
|
|
for (const board of boards) {
|
|
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
|
}
|
|
|
|
for (const serviceTicket of serviceTickets) {
|
|
context.serviceTicketIds.add(serviceTicket.id);
|
|
}
|
|
|
|
for (const opportunity of opportunities) {
|
|
context.opportunityIds.add(opportunity.id);
|
|
}
|
|
|
|
for (const catalogItem of catalogItems) {
|
|
context.catalogItemIds.add(catalogItem.id);
|
|
}
|
|
|
|
for (const location of locations) {
|
|
context.corporateLocationIds.add(location.id);
|
|
}
|
|
|
|
for (const status of opportunityStatuses) {
|
|
context.opportunityStatusIds.add(status.id);
|
|
}
|
|
|
|
for (const company of companies) {
|
|
context.companyIds.add(company.id);
|
|
}
|
|
|
|
for (const address of companyAddresses) {
|
|
context.companyAddressIds.add(address.id);
|
|
}
|
|
|
|
for (const contact of contacts) {
|
|
context.contactIds.add(contact.id);
|
|
}
|
|
|
|
for (const stage of opportunityStages) {
|
|
context.opportunityStageIds.add(stage.id);
|
|
}
|
|
|
|
for (const status of scheduleStatuses) {
|
|
context.scheduleStatusIds.add(status.id);
|
|
}
|
|
|
|
for (const type of scheduleTypes) {
|
|
context.scheduleTypeIds.add(type.id);
|
|
}
|
|
|
|
for (const span of scheduleSpans) {
|
|
context.scheduleSpanIds.add(span.id);
|
|
}
|
|
|
|
for (const taxCode of taxCodes) {
|
|
context.taxCodeIds.add(taxCode.id);
|
|
}
|
|
|
|
context.defaultOpportunityTypeId = defaultOpportunityType?.id ?? null;
|
|
|
|
// Optional context fed from env-provided maps.
|
|
context.billingTypeByTicketId = parseJsonMapFromEnv(
|
|
"BILLING_TYPE_CONTEXT_JSON"
|
|
);
|
|
context.billingInstructionsByTicketId = parseJsonMapFromEnv(
|
|
"BILLING_INSTRUCTIONS_CONTEXT_JSON"
|
|
);
|
|
};
|
|
|
|
const normalizeNarrativeFieldName = (value: unknown): string => {
|
|
if (typeof value !== "string") return "";
|
|
return value.trim().toLowerCase();
|
|
};
|
|
|
|
const isNarrativeCustomField = (fieldName: string): boolean => {
|
|
if (!fieldName) return false;
|
|
return fieldName.includes("narrative");
|
|
};
|
|
|
|
const normalizeNullableText = (value: unknown): string | null => {
|
|
if (typeof value !== "string") return null;
|
|
const trimmed = value.trim();
|
|
return trimmed.length > 0 ? trimmed : null;
|
|
};
|
|
|
|
const refreshCustomFieldContextFromCw = async (
|
|
cwPrisma: CwPrismaClient,
|
|
context: TranslationContext
|
|
): Promise<void> => {
|
|
context.opportunityNarrativeByOpportunityId.clear();
|
|
context.productCustomByIvProductId.clear();
|
|
|
|
const [productCustomRows, opportunityCustomRows] = await Promise.all([
|
|
cwPrisma.$queryRawUnsafe<Array<Record<string, unknown>>>(`
|
|
SELECT
|
|
IV_Product_RecID,
|
|
ProcurementNotes,
|
|
ProductNarrative
|
|
FROM dbo.v_IV_Product_Custom_Fields
|
|
`),
|
|
cwPrisma.$queryRawUnsafe<Array<Record<string, unknown>>>(`
|
|
SELECT
|
|
Opportunity_RecID,
|
|
opportunity_customfield,
|
|
opportunity_customvalue
|
|
FROM dbo.v_rpt_OpportunityCustomFields
|
|
WHERE opportunity_customvalue IS NOT NULL
|
|
`),
|
|
]);
|
|
|
|
for (const row of productCustomRows) {
|
|
const idRaw = row.IV_Product_RecID;
|
|
const ivProductRecId =
|
|
typeof idRaw === "number" ? idRaw : Number.parseInt(String(idRaw), 10);
|
|
if (!Number.isInteger(ivProductRecId)) continue;
|
|
|
|
context.productCustomByIvProductId.set(ivProductRecId, {
|
|
procurementNotes: normalizeNullableText(row.ProcurementNotes),
|
|
productNarrative: normalizeNullableText(row.ProductNarrative),
|
|
});
|
|
}
|
|
|
|
for (const row of opportunityCustomRows) {
|
|
const idRaw = row.Opportunity_RecID;
|
|
const opportunityRecId =
|
|
typeof idRaw === "number" ? idRaw : Number.parseInt(String(idRaw), 10);
|
|
if (!Number.isInteger(opportunityRecId)) continue;
|
|
|
|
const fieldName = normalizeNarrativeFieldName(row.opportunity_customfield);
|
|
if (!isNarrativeCustomField(fieldName)) continue;
|
|
|
|
const narrativeValue = normalizeNullableText(row.opportunity_customvalue);
|
|
if (!narrativeValue) continue;
|
|
|
|
// First narrative-looking custom field wins for each opportunity.
|
|
if (!context.opportunityNarrativeByOpportunityId.has(opportunityRecId)) {
|
|
context.opportunityNarrativeByOpportunityId.set(
|
|
opportunityRecId,
|
|
narrativeValue
|
|
);
|
|
}
|
|
}
|
|
};
|
|
|
|
const sanitizeUserForeignKeys = (
|
|
data: Row,
|
|
context: TranslationContext,
|
|
targetModel: string
|
|
): Row => {
|
|
if (
|
|
targetModel !== "serviceTicket" &&
|
|
targetModel !== "serviceTicketNote" &&
|
|
targetModel !== "company" &&
|
|
targetModel !== "companyAddress"
|
|
) {
|
|
return data;
|
|
}
|
|
|
|
const sanitized = { ...data };
|
|
|
|
const userIdFields =
|
|
targetModel === "serviceTicketNote"
|
|
? ["authorId"]
|
|
: targetModel === "serviceTicket"
|
|
? ["createdById", "updatedById", "closedById"]
|
|
: [];
|
|
|
|
const identifierFields =
|
|
targetModel === "serviceTicket"
|
|
? ["ticketOwnerId"]
|
|
: targetModel === "company"
|
|
? ["enteredById", "deletedById"]
|
|
: targetModel === "companyAddress"
|
|
? ["updatedById"]
|
|
: [];
|
|
|
|
for (const field of userIdFields) {
|
|
const value = sanitized[field];
|
|
if (typeof value === "string" && !context.userIds.has(value)) {
|
|
sanitized[field] = null;
|
|
}
|
|
}
|
|
|
|
for (const field of identifierFields) {
|
|
const value = sanitized[field];
|
|
if (typeof value === "string" && !context.usersByIdentifier.has(value)) {
|
|
sanitized[field] = null;
|
|
}
|
|
}
|
|
|
|
return sanitized;
|
|
};
|
|
|
|
const sanitizeModelData = (
|
|
data: Row,
|
|
step: Step,
|
|
context: TranslationContext
|
|
): Row => {
|
|
const sanitized = { ...data };
|
|
|
|
if (step.targetModel === "warehouseBin") {
|
|
if (sanitized.minQuantity == null) sanitized.minQuantity = 0;
|
|
if (sanitized.maxQuantity == null) sanitized.maxQuantity = 0;
|
|
}
|
|
|
|
if (step.targetModel === "contact") {
|
|
// Nullify companyAddressId if the address doesn't exist in the API
|
|
if (
|
|
sanitized.companyAddressId != null &&
|
|
!context.companyAddressIds.has(sanitized.companyAddressId as number)
|
|
) {
|
|
sanitized.companyAddressId = null;
|
|
}
|
|
// Nullify companyId if the company doesn't exist in the API
|
|
if (
|
|
sanitized.companyId != null &&
|
|
!context.companyIds.has(sanitized.companyId as number)
|
|
) {
|
|
sanitized.companyId = null;
|
|
}
|
|
}
|
|
|
|
if (step.targetModel === "companyAddress") {
|
|
// companyId is required — skip the row if the parent company hasn't synced yet
|
|
if (
|
|
sanitized.companyId == null ||
|
|
!context.companyIds.has(sanitized.companyId as number)
|
|
) {
|
|
throw new SkipRowError(
|
|
`CompanyAddress companyId=${sanitized.companyId} not found in API — skipping`
|
|
);
|
|
}
|
|
}
|
|
|
|
if (step.targetModel === "serviceTicket") {
|
|
const boardId = sanitized.serviceTicketBoardId;
|
|
if (typeof boardId === "string") {
|
|
const parsed = Number.parseInt(boardId, 10);
|
|
sanitized.serviceTicketBoardId = Number.isNaN(parsed) ? null : parsed;
|
|
}
|
|
// Nullify contactId if the contact doesn't exist in the API
|
|
if (
|
|
sanitized.contactId != null &&
|
|
!context.contactIds.has(sanitized.contactId as number)
|
|
) {
|
|
sanitized.contactId = null;
|
|
}
|
|
// Nullify companyAddressId if the address doesn't exist
|
|
if (
|
|
sanitized.companyAddressId != null &&
|
|
!context.companyAddressIds.has(sanitized.companyAddressId as number)
|
|
) {
|
|
sanitized.companyAddressId = null;
|
|
}
|
|
// Nullify companyId if the company doesn't exist
|
|
if (
|
|
sanitized.companyId != null &&
|
|
!context.companyIds.has(sanitized.companyId as number)
|
|
) {
|
|
sanitized.companyId = null;
|
|
}
|
|
// Nullify billingCompanyId if the company doesn't exist
|
|
if (
|
|
sanitized.billingCompanyId != null &&
|
|
!context.companyIds.has(sanitized.billingCompanyId as number)
|
|
) {
|
|
sanitized.billingCompanyId = null;
|
|
}
|
|
// Nullify billingAddressId if the address doesn't exist
|
|
if (
|
|
sanitized.billingAddressId != null &&
|
|
!context.companyAddressIds.has(sanitized.billingAddressId as number)
|
|
) {
|
|
sanitized.billingAddressId = null;
|
|
}
|
|
}
|
|
|
|
if (step.targetModel === "opportunity") {
|
|
if (sanitized.typeId == null && context.defaultOpportunityTypeId != null) {
|
|
sanitized.typeId = context.defaultOpportunityTypeId;
|
|
}
|
|
if (
|
|
sanitized.statusId != null &&
|
|
!context.opportunityStatusIds.has(sanitized.statusId as number)
|
|
) {
|
|
sanitized.statusId = null;
|
|
}
|
|
// Nullify companyId if the company doesn't exist
|
|
if (
|
|
sanitized.companyId != null &&
|
|
!context.companyIds.has(sanitized.companyId as number)
|
|
) {
|
|
sanitized.companyId = null;
|
|
}
|
|
// Nullify contactId if the contact doesn't exist
|
|
if (
|
|
sanitized.contactId != null &&
|
|
!context.contactIds.has(sanitized.contactId as number)
|
|
) {
|
|
sanitized.contactId = null;
|
|
}
|
|
// Nullify siteId (companyAddressId) if the address doesn't exist
|
|
if (
|
|
sanitized.siteId != null &&
|
|
!context.companyAddressIds.has(sanitized.siteId as number)
|
|
) {
|
|
sanitized.siteId = null;
|
|
}
|
|
// Nullify stageId if the stage doesn't exist
|
|
if (
|
|
sanitized.stageId != null &&
|
|
!context.opportunityStageIds.has(sanitized.stageId as number)
|
|
) {
|
|
sanitized.stageId = null;
|
|
}
|
|
// Nullify locationId if the corporate location doesn't exist
|
|
if (
|
|
sanitized.locationId != null &&
|
|
!context.corporateLocationIds.has(sanitized.locationId as number)
|
|
) {
|
|
sanitized.locationId = null;
|
|
}
|
|
// Nullify taxCodeId if the tax code hasn't synced yet
|
|
if (
|
|
sanitized.taxCodeId != null &&
|
|
!context.taxCodeIds.has(sanitized.taxCodeId as number)
|
|
) {
|
|
sanitized.taxCodeId = null;
|
|
}
|
|
}
|
|
|
|
if (step.targetModel === "schedule") {
|
|
// Nullify statusId if the referenced ScheduleStatus hasn't synced yet
|
|
if (
|
|
sanitized.statusId != null &&
|
|
!context.scheduleStatusIds.has(sanitized.statusId as number)
|
|
) {
|
|
sanitized.statusId = null;
|
|
}
|
|
// Nullify typeId if the referenced ScheduleType hasn't synced yet
|
|
if (
|
|
sanitized.typeId != null &&
|
|
!context.scheduleTypeIds.has(sanitized.typeId as number)
|
|
) {
|
|
sanitized.typeId = null;
|
|
}
|
|
// Nullify scheduleSpanId if the referenced ScheduleSpan hasn't synced yet
|
|
if (
|
|
sanitized.scheduleSpanId != null &&
|
|
!context.scheduleSpanIds.has(sanitized.scheduleSpanId as number)
|
|
) {
|
|
sanitized.scheduleSpanId = null;
|
|
}
|
|
}
|
|
|
|
return sanitized;
|
|
};
|
|
|
|
const asRecord = (value: unknown): Record<string, unknown> | null => {
|
|
if (typeof value !== "object" || value == null) {
|
|
return null;
|
|
}
|
|
|
|
return value as Record<string, unknown>;
|
|
};
|
|
|
|
const parseFieldsFromErrorMessage = (message: string): string[] => {
|
|
const fields = new Set<string>();
|
|
|
|
// Handles patterns like (`"cwIdentifier"`) and (`login`)
|
|
for (const match of message.matchAll(/`\"([^\"]+)\"`|`([^`]+)`/g)) {
|
|
const candidate = (match[1] ?? match[2] ?? "").trim();
|
|
if (!candidate) continue;
|
|
|
|
// Skip common non-field tokens that can appear in error messages.
|
|
if (candidate === "targetDelegate.upsert()" || candidate === "invocation") {
|
|
continue;
|
|
}
|
|
|
|
fields.add(candidate);
|
|
}
|
|
|
|
return [...fields];
|
|
};
|
|
|
|
const getUniqueConstraintFields = (error: unknown): string[] => {
|
|
const errorRecord = asRecord(error);
|
|
if (!errorRecord || errorRecord.code !== "P2002") {
|
|
return [];
|
|
}
|
|
|
|
const meta = asRecord(errorRecord.meta);
|
|
if (!meta) {
|
|
return [];
|
|
}
|
|
|
|
const target = meta.target;
|
|
if (Array.isArray(target)) {
|
|
return target.filter((field): field is string => typeof field === "string");
|
|
}
|
|
|
|
if (typeof target === "string") {
|
|
return [target];
|
|
}
|
|
|
|
if (typeof errorRecord.message === "string") {
|
|
return parseFieldsFromErrorMessage(errorRecord.message);
|
|
}
|
|
|
|
return [];
|
|
};
|
|
|
|
const formatValue = (value: unknown): string => {
|
|
if (value instanceof Date) {
|
|
return value.toISOString();
|
|
}
|
|
|
|
if (typeof value === "string") {
|
|
return JSON.stringify(value);
|
|
}
|
|
|
|
if (
|
|
typeof value === "number" ||
|
|
typeof value === "boolean" ||
|
|
value == null
|
|
) {
|
|
return String(value);
|
|
}
|
|
|
|
try {
|
|
return JSON.stringify(value);
|
|
} catch {
|
|
return "[unserializable]";
|
|
}
|
|
};
|
|
|
|
const formatUniqueConstraintError = (
|
|
error: unknown,
|
|
data: Row | null
|
|
): string => {
|
|
const fields = getUniqueConstraintFields(error);
|
|
|
|
if (fields.length === 0) {
|
|
return "Unique constraint failed.";
|
|
}
|
|
|
|
const fieldValues = fields.map((field) => {
|
|
const value = data ? data[field] : undefined;
|
|
return `${field}=${formatValue(value)}`;
|
|
});
|
|
|
|
return `Unique constraint failed on field(s): ${fields.join(
|
|
", "
|
|
)}. Values: ${fieldValues.join(", ")}`;
|
|
};
|
|
|
|
const toComparableKey = (value: unknown): string | null => {
|
|
if (value == null) {
|
|
return null;
|
|
}
|
|
|
|
if (typeof value === "number") {
|
|
if (Number.isNaN(value)) {
|
|
return null;
|
|
}
|
|
return `number:${value}`;
|
|
}
|
|
|
|
if (typeof value === "string") {
|
|
return `string:${value}`;
|
|
}
|
|
|
|
if (typeof value === "bigint") {
|
|
return `bigint:${value.toString()}`;
|
|
}
|
|
|
|
if (typeof value === "boolean") {
|
|
return `boolean:${value}`;
|
|
}
|
|
|
|
return null;
|
|
};
|
|
|
|
const reconcileStepDeletes = async (
|
|
cwPrisma: CwPrismaClient,
|
|
apiPrisma: ApiPrismaClient,
|
|
step: Step
|
|
): Promise<DeleteResult> => {
|
|
const sourceDelegate = (
|
|
cwPrisma as unknown as Record<string, { findMany: Function }>
|
|
)[step.sourceModel];
|
|
const targetDelegate = (
|
|
apiPrisma as unknown as Record<
|
|
string,
|
|
{ findMany: Function; deleteMany: Function; delete: Function }
|
|
>
|
|
)[step.targetModel];
|
|
|
|
if (!sourceDelegate) {
|
|
throw new Error(`CW delegate not found: ${step.sourceModel}`);
|
|
}
|
|
|
|
if (!targetDelegate) {
|
|
throw new Error(`API delegate not found: ${step.targetModel}`);
|
|
}
|
|
|
|
const sourceWhere =
|
|
(step.sourceArgs as Record<string, unknown> | undefined)?.where ??
|
|
undefined;
|
|
|
|
const sourceRows = (await sourceDelegate.findMany({
|
|
...(sourceWhere ? { where: sourceWhere } : {}),
|
|
select: {
|
|
[step.sourceIdField]: true,
|
|
},
|
|
})) as Row[];
|
|
|
|
const sourceKeys = new Set<string>();
|
|
for (const sourceRow of sourceRows) {
|
|
const sourceKey = toComparableKey(sourceRow[step.sourceIdField]);
|
|
if (sourceKey) {
|
|
sourceKeys.add(sourceKey);
|
|
}
|
|
}
|
|
|
|
const targetRows = (await targetDelegate.findMany({
|
|
...(step.targetDeleteWhere ? { where: step.targetDeleteWhere } : {}),
|
|
select: {
|
|
[step.uniqueField]: true,
|
|
},
|
|
})) as Row[];
|
|
|
|
const staleValues: Array<string | number | bigint | boolean> = [];
|
|
|
|
for (const targetRow of targetRows) {
|
|
const value = targetRow[step.uniqueField];
|
|
const targetKey = toComparableKey(value);
|
|
|
|
if (!targetKey) {
|
|
continue;
|
|
}
|
|
|
|
if (!sourceKeys.has(targetKey)) {
|
|
staleValues.push(value as string | number | bigint | boolean);
|
|
}
|
|
}
|
|
|
|
if (staleValues.length === 0) {
|
|
return { deleted: 0, failed: 0 };
|
|
}
|
|
|
|
const chunkSize = 500;
|
|
let deleted = 0;
|
|
let failed = 0;
|
|
let sampleErrorsPrinted = 0;
|
|
|
|
for (let index = 0; index < staleValues.length; index += chunkSize) {
|
|
const chunk = staleValues.slice(index, index + chunkSize);
|
|
|
|
try {
|
|
const deleteManyResult = (await targetDelegate.deleteMany({
|
|
where: {
|
|
...(step.targetDeleteWhere ?? {}),
|
|
[step.uniqueField]: { in: chunk },
|
|
},
|
|
})) as unknown;
|
|
|
|
const deletedCount = asRecord(deleteManyResult)?.count as
|
|
| number
|
|
| undefined;
|
|
deleted += typeof deletedCount === "number" ? deletedCount : chunk.length;
|
|
continue;
|
|
} catch {
|
|
// Fall back to row-by-row deletes so one FK violation does not block all deletions.
|
|
}
|
|
|
|
for (const value of chunk) {
|
|
try {
|
|
await targetDelegate.delete({
|
|
where: {
|
|
[step.uniqueField]: value,
|
|
},
|
|
});
|
|
|
|
deleted += 1;
|
|
} catch (error) {
|
|
failed += 1;
|
|
if (sampleErrorsPrinted < 5) {
|
|
sampleErrorsPrinted += 1;
|
|
const message =
|
|
error instanceof Error ? error.message : "Unknown delete error";
|
|
console.error(
|
|
`Delete reconcile failed in ${step.name} for ${
|
|
step.uniqueField
|
|
}=${formatValue(value)}:`,
|
|
message
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (failed > sampleErrorsPrinted) {
|
|
console.error(
|
|
`${step.name}: suppressed ${
|
|
failed - sampleErrorsPrinted
|
|
} additional delete errors`
|
|
);
|
|
}
|
|
|
|
return { deleted, failed };
|
|
};
|
|
|
|
type SmartSyncDecision =
|
|
| { mode: "full"; differences: SmartSyncDifference[] }
|
|
| {
|
|
mode: "incremental";
|
|
sourceIds: number[];
|
|
differences: SmartSyncDifference[];
|
|
};
|
|
|
|
type SmartSyncDifference = {
|
|
sourceId: number;
|
|
reason: "missing-in-api" | "cw-newer";
|
|
cwUpdatedAt: Date;
|
|
apiUpdatedAt: Date | null;
|
|
};
|
|
|
|
const logAllSmartSyncDifferences = (
|
|
step: Step,
|
|
differences: SmartSyncDifference[]
|
|
): void => {
|
|
if (differences.length === 0) {
|
|
console.log(
|
|
` [smart-sync][details] ${step.name}: no differences in sampled records`
|
|
);
|
|
return;
|
|
}
|
|
|
|
console.log(
|
|
` [smart-sync][details] ${step.name}: logging all ${differences.length} differences`
|
|
);
|
|
|
|
for (const diff of differences) {
|
|
const apiUpdated = diff.apiUpdatedAt
|
|
? diff.apiUpdatedAt.toISOString()
|
|
: "null";
|
|
console.log(
|
|
` [diff] sourceModel=${step.sourceModel} targetModel=${
|
|
step.targetModel
|
|
} id=${diff.sourceId} reason=${
|
|
diff.reason
|
|
} cwUpdatedAt=${diff.cwUpdatedAt.toISOString()} apiUpdatedAt=${apiUpdated}`
|
|
);
|
|
}
|
|
};
|
|
|
|
const computeSmartSyncDecision = async (
|
|
cwPrisma: CwPrismaClient,
|
|
apiPrisma: ApiPrismaClient,
|
|
step: Step,
|
|
forceIncremental = false
|
|
): Promise<SmartSyncDecision> => {
|
|
const cwDelegate = (
|
|
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
|
)[step.sourceModel];
|
|
const apiDelegate = (
|
|
apiPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
|
)[step.targetModel];
|
|
|
|
if (!cwDelegate || !apiDelegate) {
|
|
return { mode: "full", differences: [] };
|
|
}
|
|
|
|
const existingWhere =
|
|
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
|
|
|
// Count-based check: force a full sync when the two databases have diverged
|
|
// significantly in either direction.
|
|
// CW >> API → rows are missing from API (old records never in recency window)
|
|
// API >> CW → stale rows exist in API (need delete reconciliation via full sync)
|
|
if (!forceIncremental) {
|
|
const [cwCount, apiCount] = await Promise.all([
|
|
(cwDelegate as unknown as { count: Function }).count({ where: existingWhere }),
|
|
(apiDelegate as unknown as { count: Function }).count(),
|
|
]);
|
|
const gap = cwCount - apiCount; // positive = API missing rows, negative = API has extra rows
|
|
const threshold = Math.max(50, Math.floor(Math.max(cwCount, apiCount) * 0.05));
|
|
if (Math.abs(gap) > threshold) {
|
|
const direction = gap > 0 ? "api-missing-rows" : "api-has-stale-rows";
|
|
console.log(
|
|
` [smart-sync] ${step.name}: count gap detected (cw=${cwCount}, api=${apiCount}, gap=${gap}, direction=${direction}) — forcing full sync`
|
|
);
|
|
return { mode: "full", differences: [] };
|
|
}
|
|
}
|
|
|
|
// Fetch top 1000 CW records: only id + updatedAt
|
|
const cwSample = (await cwDelegate.findMany({
|
|
select: {
|
|
[step.sourceIdField]: true,
|
|
[step.sourceUpdatedField]: true,
|
|
},
|
|
where: existingWhere,
|
|
orderBy: { [step.sourceUpdatedField]: "desc" },
|
|
take: 1000,
|
|
})) as Row[];
|
|
|
|
if (cwSample.length === 0) {
|
|
return { mode: "incremental", sourceIds: [], differences: [] };
|
|
}
|
|
|
|
const cwIds = cwSample.map((r) => r[step.sourceIdField] as number);
|
|
|
|
// If the API model has no timestamp field, skip incremental comparison and do a full sync.
|
|
const apiUpdatedField = step.targetUpdatedField === undefined ? "updatedAt" : step.targetUpdatedField;
|
|
if (apiUpdatedField === null) {
|
|
return { mode: "full", differences: [] };
|
|
}
|
|
|
|
// Fetch the corresponding API records for comparison
|
|
const apiSample = (await apiDelegate.findMany({
|
|
select: {
|
|
[step.uniqueField]: true,
|
|
[apiUpdatedField]: true,
|
|
},
|
|
where: { [step.uniqueField]: { in: cwIds } },
|
|
})) as Row[];
|
|
|
|
const apiMap = new Map<number, Date>();
|
|
for (const row of apiSample) {
|
|
const id = row[step.uniqueField] as number;
|
|
apiMap.set(id, (row[apiUpdatedField] as Date | null) ?? new Date(0));
|
|
}
|
|
|
|
let differences = 0;
|
|
const differenceRows: SmartSyncDifference[] = [];
|
|
for (const cwRow of cwSample) {
|
|
const cwId = cwRow[step.sourceIdField] as number;
|
|
const cwUpdated =
|
|
(cwRow[step.sourceUpdatedField] as Date | null) ?? new Date(0);
|
|
|
|
if (!apiMap.has(cwId)) {
|
|
differences++;
|
|
differenceRows.push({
|
|
sourceId: cwId,
|
|
reason: "missing-in-api",
|
|
cwUpdatedAt: cwUpdated,
|
|
apiUpdatedAt: null,
|
|
});
|
|
} else {
|
|
const apiUpdated = apiMap.get(cwId)!;
|
|
if (cwUpdated.getTime() > apiUpdated.getTime()) {
|
|
differences++;
|
|
differenceRows.push({
|
|
sourceId: cwId,
|
|
reason: "cw-newer",
|
|
cwUpdatedAt: cwUpdated,
|
|
apiUpdatedAt: apiUpdated,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
console.log(
|
|
` [smart-sync] ${step.name}: ${differences} differences in top 1000`
|
|
);
|
|
|
|
if (forceIncremental) {
|
|
return {
|
|
mode: "incremental",
|
|
sourceIds: differenceRows.map((d) => d.sourceId),
|
|
differences: differenceRows,
|
|
};
|
|
}
|
|
|
|
if (differences > 100) {
|
|
return { mode: "full", differences: differenceRows };
|
|
}
|
|
|
|
return {
|
|
mode: "incremental",
|
|
sourceIds: differenceRows.map((d) => d.sourceId),
|
|
differences: differenceRows,
|
|
};
|
|
};
|
|
|
|
const syncStep = async (
|
|
cwPrisma: CwPrismaClient,
|
|
apiPrisma: ApiPrismaClient,
|
|
context: TranslationContext,
|
|
step: Step,
|
|
sourceIdsFilter?: number[]
|
|
): Promise<StepResult> => {
|
|
const sourceDelegate = (
|
|
cwPrisma as unknown as Record<string, { findMany: Function }>
|
|
)[step.sourceModel];
|
|
const targetDelegate = (
|
|
apiPrisma as unknown as Record<string, { upsert: Function }>
|
|
)[step.targetModel];
|
|
|
|
if (!sourceDelegate) {
|
|
throw new Error(`CW delegate not found: ${step.sourceModel}`);
|
|
}
|
|
|
|
if (!targetDelegate) {
|
|
throw new Error(`API delegate not found: ${step.targetModel}`);
|
|
}
|
|
|
|
if (sourceIdsFilter !== undefined && sourceIdsFilter.length === 0) {
|
|
return { insertedOrUpdated: 0, skipped: 0, failed: 0 };
|
|
}
|
|
|
|
const baseArgs: Record<string, unknown> = step.sourceArgs ?? {};
|
|
|
|
let insertedOrUpdated = 0;
|
|
let skipped = 0;
|
|
let failed = 0;
|
|
let sampleErrorsPrinted = 0;
|
|
|
|
const processRow = async (row: Row) => {
|
|
let translatedData: Row | null = null;
|
|
|
|
try {
|
|
const translated = translateRow(row, step.translation, context);
|
|
const userSanitized = sanitizeUserForeignKeys(
|
|
translated,
|
|
context,
|
|
step.targetModel
|
|
);
|
|
const data = sanitizeModelData(userSanitized, step, context);
|
|
translatedData = data;
|
|
const uniqueValue = data[step.uniqueField];
|
|
|
|
if (
|
|
uniqueValue === undefined ||
|
|
uniqueValue === null ||
|
|
uniqueValue === ""
|
|
) {
|
|
skipped += 1;
|
|
return;
|
|
}
|
|
|
|
await targetDelegate.upsert({
|
|
where: {
|
|
[step.uniqueField]: uniqueValue,
|
|
},
|
|
create: data,
|
|
update: data,
|
|
});
|
|
|
|
insertedOrUpdated += 1;
|
|
} catch (error) {
|
|
if (error instanceof SkipRowError) {
|
|
skipped += 1;
|
|
return;
|
|
}
|
|
|
|
failed += 1;
|
|
if (sampleErrorsPrinted < 5) {
|
|
sampleErrorsPrinted += 1;
|
|
const uniqueFields = getUniqueConstraintFields(error);
|
|
const message =
|
|
uniqueFields.length > 0
|
|
? formatUniqueConstraintError(error, translatedData)
|
|
: error instanceof Error
|
|
? error.message
|
|
: "Unknown row sync error";
|
|
console.error(
|
|
`Failed row in ${step.name} (source ${step.sourceModel} -> target ${step.targetModel}):`,
|
|
message
|
|
);
|
|
}
|
|
}
|
|
};
|
|
|
|
if (sourceIdsFilter !== undefined) {
|
|
// Incremental sync: fetch only the specific IDs
|
|
const existingWhere = (baseArgs as Record<string, unknown>).where ?? {};
|
|
const findManyArgs = {
|
|
...baseArgs,
|
|
where: {
|
|
...(existingWhere as Record<string, unknown>),
|
|
[step.sourceIdField]: { in: sourceIdsFilter },
|
|
},
|
|
};
|
|
const rows = (await sourceDelegate.findMany(findManyArgs)) as Row[];
|
|
for (const row of rows) {
|
|
await processRow(row);
|
|
}
|
|
} else {
|
|
// Full sync: paginate through all records to avoid loading 100K+ rows at once
|
|
const FULL_SYNC_PAGE_SIZE = 5000;
|
|
let skip = 0;
|
|
while (true) {
|
|
const pageArgs: Record<string, unknown> = {
|
|
...baseArgs,
|
|
orderBy: { [step.sourceIdField]: "asc" as const },
|
|
take: FULL_SYNC_PAGE_SIZE,
|
|
skip,
|
|
};
|
|
const pageRows = (await sourceDelegate.findMany(pageArgs)) as Row[];
|
|
for (const row of pageRows) {
|
|
await processRow(row);
|
|
}
|
|
if (pageRows.length < FULL_SYNC_PAGE_SIZE) break;
|
|
skip += FULL_SYNC_PAGE_SIZE;
|
|
}
|
|
}
|
|
|
|
if (failed > sampleErrorsPrinted) {
|
|
console.error(
|
|
`${step.name}: suppressed ${
|
|
failed - sampleErrorsPrinted
|
|
} additional row errors`
|
|
);
|
|
}
|
|
|
|
return { insertedOrUpdated, skipped, failed };
|
|
};
|
|
|
|
export const executeFullDalpuriSync = async (options?: {
|
|
forceIncremental?: boolean;
|
|
logAllDifferences?: boolean;
|
|
reconcileDeletes?: boolean;
|
|
jobRunId?: string;
|
|
timeoutMs?: number;
|
|
}): Promise<void> => {
|
|
const forceIncremental = options?.forceIncremental === true;
|
|
const logAllDifferences = options?.logAllDifferences ?? !forceIncremental;
|
|
const reconcileDeletes = options?.reconcileDeletes ?? true;
|
|
const jobRunId = options?.jobRunId;
|
|
const timeoutMs = options?.timeoutMs ?? 30 * 60 * 1000; // 30 minutes default
|
|
const cwDatabaseUrl = process.env.CW_DATABASE_URL || process.env.DATABASE_URL;
|
|
const apiDatabaseUrl = resolveApiDatabaseUrl();
|
|
|
|
if (!cwDatabaseUrl) {
|
|
throw new Error(
|
|
"Missing CW DB connection string. Set CW_DATABASE_URL or DATABASE_URL."
|
|
);
|
|
}
|
|
|
|
if (!apiDatabaseUrl) {
|
|
throw new Error(
|
|
"Missing API DB connection string. Set API_DATABASE_URL/OPTIMA_API_DATABASE_URL or ensure ../api/.env has DATABASE_URL."
|
|
);
|
|
}
|
|
|
|
const cwAdapter = new PrismaMssql(cwDatabaseUrl);
|
|
const apiAdapter = new PrismaPg({ connectionString: apiDatabaseUrl });
|
|
|
|
const cwPrisma = new CwPrismaClient({ adapter: cwAdapter });
|
|
const apiPrisma = new ApiPrismaClient({ adapter: apiAdapter });
|
|
|
|
const context = createTranslationContext();
|
|
|
|
// Helper to update job status in the database
|
|
const updateJobStatus = async (
|
|
status: "RUNNING" | "COMPLETED" | "FAILED" | "TIMED_OUT",
|
|
errorSummary?: string
|
|
) => {
|
|
if (!jobRunId) return;
|
|
try {
|
|
await apiPrisma.syncJobRun.update({
|
|
where: { id: jobRunId },
|
|
data: {
|
|
status,
|
|
...(status === "RUNNING" ? { startedAt: new Date() } : {}),
|
|
...(status === "COMPLETED" || status === "FAILED" || status === "TIMED_OUT"
|
|
? { completedAt: new Date() }
|
|
: {}),
|
|
...(errorSummary ? { errorSummary } : {}),
|
|
},
|
|
});
|
|
} catch (err) {
|
|
console.error("[sync] Failed to update job status:", err);
|
|
}
|
|
};
|
|
|
|
// Helper to write a step log
|
|
const writeStepLog = async (
|
|
tableName: string,
|
|
syncMode: string,
|
|
result: StepResult,
|
|
deleteResult: DeleteResult,
|
|
durationMs: number,
|
|
sampleErrors: string[] = []
|
|
) => {
|
|
if (!jobRunId) return;
|
|
try {
|
|
await apiPrisma.syncStepLog.create({
|
|
data: {
|
|
syncJobRunId: jobRunId,
|
|
tableName,
|
|
syncMode,
|
|
recordsProcessed: result.insertedOrUpdated + result.skipped + result.failed,
|
|
recordsInserted: result.insertedOrUpdated,
|
|
recordsSkipped: result.skipped,
|
|
recordsFailed: result.failed,
|
|
recordsDeleted: deleteResult.deleted,
|
|
sampleErrors: sampleErrors,
|
|
durationMs,
|
|
},
|
|
});
|
|
} catch (err) {
|
|
console.error("[sync] Failed to write step log:", err);
|
|
}
|
|
};
|
|
|
|
// Timeout tracking
|
|
const syncStartTime = Date.now();
|
|
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
|
|
|
const steps: Step[] = [
|
|
{
|
|
name: "CW Members",
|
|
sourceModel: "member",
|
|
targetModel: "cwMember",
|
|
translation: cwMemberTranslation as unknown as AnyTranslation,
|
|
uniqueField: "cwMemberId",
|
|
sourceIdField: "memberRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Users",
|
|
sourceModel: "member",
|
|
targetModel: "user",
|
|
translation: userTranslation as unknown as AnyTranslation,
|
|
uniqueField: "cwMemberId",
|
|
sourceIdField: "memberRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
targetDeleteWhere: {
|
|
cwMemberId: {
|
|
not: null,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Companies",
|
|
sourceModel: "company",
|
|
targetModel: "company",
|
|
translation: companyTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "companyRecId",
|
|
sourceUpdatedField: "lastUpdate",
|
|
},
|
|
{
|
|
name: "Company Addresses",
|
|
sourceModel: "companyAddress",
|
|
targetModel: "companyAddress",
|
|
translation: companyAddressTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "companyAddressRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Contacts",
|
|
sourceModel: "contact",
|
|
targetModel: "contact",
|
|
translation: contactTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "contactRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Corporate Locations",
|
|
sourceModel: "ownerLevel",
|
|
targetModel: "corporateLocation",
|
|
translation: corporateLocationTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "ownerLevelRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Internal Departments",
|
|
sourceModel: "department",
|
|
targetModel: "internalDepartment",
|
|
translation: internalDepartmentTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "departmentRecId",
|
|
sourceUpdatedField: "lastUpdatedUTC",
|
|
},
|
|
{
|
|
name: "Catalog Item Types",
|
|
sourceModel: "productType",
|
|
targetModel: "catalogItemType",
|
|
translation: catalogItemTypeTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "typeRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Catalog Categories",
|
|
sourceModel: "productCategory",
|
|
targetModel: "catalogCategory",
|
|
translation: catalogCategoryTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "categoryRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Catalog Subcategories",
|
|
sourceModel: "productSubcategory",
|
|
targetModel: "catalogSubcategory",
|
|
translation: catalogSubcategoryTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "subcategoryRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Catalog Manufacturers",
|
|
sourceModel: "manufacturer",
|
|
targetModel: "catalogManufacturer",
|
|
translation: catalogManufacturerTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "manufacturerRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Warehouse Bins",
|
|
sourceModel: "warehouseBin",
|
|
targetModel: "warehouseBin",
|
|
translation: warehouseBinTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "warehouseBinRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Catalog Items",
|
|
sourceModel: "productCatalog",
|
|
targetModel: "catalogItem",
|
|
translation: catalogItemTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "catalogRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Product Inventory",
|
|
sourceModel: "productInventory",
|
|
targetModel: "productInventory",
|
|
translation: productInventoryTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "inventoryRecId",
|
|
sourceUpdatedField: "lastUpdate",
|
|
},
|
|
{
|
|
name: "Service Ticket Types",
|
|
sourceModel: "srType",
|
|
targetModel: "serviceTicketType",
|
|
translation: serviceTicketTypeTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srTypeRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Locations",
|
|
sourceModel: "srLocation",
|
|
targetModel: "serviceTicketLocation",
|
|
translation:
|
|
serviceTicketLocationTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srLocationRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Sources",
|
|
sourceModel: "srSource",
|
|
targetModel: "serviceTicketSource",
|
|
translation: serviceTicketSourceTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srSourceRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Impacts",
|
|
sourceModel: "srImpact",
|
|
targetModel: "serviceTicketImpact",
|
|
translation: serviceTicketImpactTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srImpactRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Severities",
|
|
sourceModel: "srSeverity",
|
|
targetModel: "serviceTicketSeverity",
|
|
translation:
|
|
serviceTicketSeverityTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srSeverityRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Priorities",
|
|
sourceModel: "srUrgency",
|
|
targetModel: "serviceTicketPriority",
|
|
translation:
|
|
serviceTicketPriorityTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srUrgencyRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Service Ticket Boards",
|
|
sourceModel: "srBoard",
|
|
targetModel: "serviceTicketBoard",
|
|
translation: serviceTicketBoardTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srBoardRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Tax Codes",
|
|
sourceModel: "taxCode",
|
|
targetModel: "taxCode",
|
|
translation: taxCodeTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "taxCodeRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
sourceArgs: {
|
|
include: {
|
|
levels: {
|
|
where: { taxXref: { not: null } },
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Opportunity Stages",
|
|
sourceModel: "soPipeline",
|
|
targetModel: "opportunityStage",
|
|
translation: opportunityStageTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "soPipelineRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Opportunity Types",
|
|
sourceModel: "soType",
|
|
targetModel: "opportunityType",
|
|
translation: opportunityTypeTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "soTypeRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Opportunity Statuses",
|
|
sourceModel: "soOppStatus",
|
|
targetModel: "opportunityStatus",
|
|
translation: opportunityStatusTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "soOppStatusRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Opportunities",
|
|
sourceModel: "opportunity",
|
|
targetModel: "opportunity",
|
|
translation: opportunityTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "opportunityRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
sourceArgs: {
|
|
include: {
|
|
members: {
|
|
select: {
|
|
memberRecId: true,
|
|
primarySalesFlag: true,
|
|
secondarySalesFlag: true,
|
|
},
|
|
},
|
|
soOppStatus: {
|
|
select: {
|
|
closedFlag: true,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Service Tickets",
|
|
sourceModel: "srService",
|
|
targetModel: "serviceTicket",
|
|
translation: serviceTicketTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "srServiceRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
},
|
|
{
|
|
name: "Product Data",
|
|
sourceModel: "iV_Product",
|
|
targetModel: "productData",
|
|
translation: productDataTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "ivProductRecId",
|
|
sourceUpdatedField: "lastUpdatedUTC",
|
|
},
|
|
{
|
|
name: "Service Ticket Notes",
|
|
sourceModel: "ticketNote",
|
|
targetModel: "serviceTicketNote",
|
|
translation: serviceTicketNoteTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "ticketNoteRecId",
|
|
sourceUpdatedField: "lastUpdatedUtc",
|
|
sourceArgs: {
|
|
orderBy: {
|
|
dateCreated: "asc",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "Schedule Statuses",
|
|
sourceModel: "scheduleStatus",
|
|
targetModel: "scheduleStatus",
|
|
translation: scheduleStatusTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "scheduleStatusRecId",
|
|
sourceUpdatedField: "lastUpdateUtc",
|
|
},
|
|
{
|
|
name: "Schedule Types",
|
|
sourceModel: "scheduleType",
|
|
targetModel: "scheduleType",
|
|
translation: scheduleTypeTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "scheduleTypeRecId",
|
|
sourceUpdatedField: "lastUpdateUtc",
|
|
},
|
|
{
|
|
name: "Schedule Spans",
|
|
sourceModel: "scheduleSpan",
|
|
targetModel: "scheduleSpan",
|
|
translation: scheduleSpanTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "scheduleSpanRecId",
|
|
sourceUpdatedField: "scheduleSpanRecId",
|
|
targetUpdatedField: null,
|
|
},
|
|
{
|
|
name: "Schedules",
|
|
sourceModel: "schedule",
|
|
targetModel: "schedule",
|
|
translation: scheduleTranslation as unknown as AnyTranslation,
|
|
uniqueField: "id",
|
|
sourceIdField: "scheduleRecId",
|
|
sourceUpdatedField: "lastUpdateUtc",
|
|
},
|
|
];
|
|
|
|
try {
|
|
await updateJobStatus("RUNNING");
|
|
|
|
try {
|
|
await refreshContextFromApi(apiPrisma, context);
|
|
await refreshCustomFieldContextFromCw(cwPrisma, context);
|
|
} catch (contextError) {
|
|
const errorMsg = contextError instanceof Error ? contextError.message : String(contextError);
|
|
console.error("[sync] Context refresh failed — aborting sync:", errorMsg);
|
|
await updateJobStatus("FAILED", `Context refresh failed: ${errorMsg}`);
|
|
return;
|
|
}
|
|
|
|
const summary: Array<{
|
|
step: string;
|
|
result: StepResult;
|
|
deleteResult: DeleteResult;
|
|
}> = [];
|
|
const summaryByStep = new Map<
|
|
string,
|
|
{
|
|
step: string;
|
|
result: StepResult;
|
|
deleteResult: DeleteResult;
|
|
}
|
|
>();
|
|
|
|
for (const step of steps) {
|
|
// Check timeout between steps
|
|
if (isTimedOut()) {
|
|
console.error(`[sync] Timeout exceeded (${timeoutMs}ms) — aborting at step ${step.name}`);
|
|
await updateJobStatus("TIMED_OUT", `Timed out at step: ${step.name}`);
|
|
return;
|
|
}
|
|
|
|
// Refresh context before tables that depend heavily on cross-table lookups.
|
|
if (
|
|
step.targetModel === "user" ||
|
|
step.targetModel === "contact" ||
|
|
step.targetModel === "serviceTicketBoard" ||
|
|
step.targetModel === "serviceTicket" ||
|
|
step.targetModel === "opportunity" ||
|
|
step.targetModel === "productData" ||
|
|
step.targetModel === "serviceTicketNote" ||
|
|
step.targetModel === "schedule"
|
|
) {
|
|
try {
|
|
await refreshContextFromApi(apiPrisma, context);
|
|
if (
|
|
step.targetModel === "opportunity" ||
|
|
step.targetModel === "productData"
|
|
) {
|
|
await refreshCustomFieldContextFromCw(cwPrisma, context);
|
|
}
|
|
} catch (contextError) {
|
|
const errorMsg = contextError instanceof Error ? contextError.message : String(contextError);
|
|
console.error(`[sync] Context refresh failed before ${step.name} — aborting:`, errorMsg);
|
|
await updateJobStatus("FAILED", `Context refresh failed before ${step.name}: ${errorMsg}`);
|
|
return;
|
|
}
|
|
}
|
|
|
|
const stepStart = Date.now();
|
|
console.log(`Starting ${step.name}...`);
|
|
const decision = await computeSmartSyncDecision(
|
|
cwPrisma,
|
|
apiPrisma,
|
|
step,
|
|
forceIncremental
|
|
);
|
|
const sourceIdsFilter =
|
|
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
|
console.log(
|
|
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
|
decision.mode
|
|
}${
|
|
decision.mode === "incremental"
|
|
? ` (${decision.sourceIds.length} ids)`
|
|
: ""
|
|
}`
|
|
);
|
|
if (logAllDifferences) {
|
|
logAllSmartSyncDifferences(step, decision.differences);
|
|
}
|
|
const result = await syncStep(
|
|
cwPrisma,
|
|
apiPrisma,
|
|
context,
|
|
step,
|
|
sourceIdsFilter
|
|
);
|
|
const summaryItem = {
|
|
step: step.name,
|
|
result,
|
|
deleteResult: { deleted: 0, failed: 0 },
|
|
};
|
|
summary.push(summaryItem);
|
|
summaryByStep.set(step.name, summaryItem);
|
|
console.log(
|
|
`${step.name}: upserted=${result.insertedOrUpdated} skipped=${result.skipped} failed=${result.failed}`
|
|
);
|
|
|
|
await writeStepLog(
|
|
step.name,
|
|
decision.mode,
|
|
result,
|
|
{ deleted: 0, failed: 0 },
|
|
Date.now() - stepStart
|
|
);
|
|
}
|
|
|
|
if (reconcileDeletes) {
|
|
// Check timeout before delete reconciliation
|
|
if (isTimedOut()) {
|
|
console.error(`[sync] Timeout exceeded — skipping delete reconciliation`);
|
|
await updateJobStatus("TIMED_OUT", "Timed out before delete reconciliation");
|
|
return;
|
|
}
|
|
|
|
const deleteSteps = forceIncremental
|
|
? [steps[incrementalDeleteStepIndex % steps.length]]
|
|
: [...steps].reverse();
|
|
|
|
if (forceIncremental) {
|
|
const selected = deleteSteps[0];
|
|
console.log(
|
|
`[delete-reconcile] incremental sweep: ${selected.name} (${
|
|
incrementalDeleteStepIndex + 1
|
|
}/${steps.length})`
|
|
);
|
|
incrementalDeleteStepIndex =
|
|
(incrementalDeleteStepIndex + 1) % steps.length;
|
|
}
|
|
|
|
for (const step of deleteSteps) {
|
|
if (isTimedOut()) {
|
|
console.error(`[sync] Timeout exceeded during delete reconciliation at ${step.name}`);
|
|
await updateJobStatus("TIMED_OUT", `Timed out during delete reconciliation: ${step.name}`);
|
|
return;
|
|
}
|
|
|
|
console.log(`Reconciling deletes for ${step.name}...`);
|
|
const deleteResult = await reconcileStepDeletes(
|
|
cwPrisma,
|
|
apiPrisma,
|
|
step
|
|
);
|
|
const summaryItem = summaryByStep.get(step.name);
|
|
if (summaryItem) {
|
|
summaryItem.deleteResult = deleteResult;
|
|
}
|
|
console.log(
|
|
`${step.name}: deleted=${deleteResult.deleted} deleteFailed=${deleteResult.failed}`
|
|
);
|
|
}
|
|
}
|
|
|
|
console.log("\nSync complete.\n");
|
|
for (const item of summary) {
|
|
console.log(
|
|
`${item.step}: upserted=${item.result.insertedOrUpdated}, skipped=${item.result.skipped}, failed=${item.result.failed}, deleted=${item.deleteResult.deleted}, deleteFailed=${item.deleteResult.failed}`
|
|
);
|
|
}
|
|
|
|
await updateJobStatus("COMPLETED");
|
|
} catch (error) {
|
|
const errorMsg = error instanceof Error ? error.message : String(error);
|
|
console.error("[sync] Unhandled sync error:", errorMsg);
|
|
// Try to write the error to the job run
|
|
if (jobRunId) {
|
|
try {
|
|
await apiPrisma.syncJobRun.update({
|
|
where: { id: jobRunId },
|
|
data: {
|
|
status: "FAILED",
|
|
completedAt: new Date(),
|
|
errorSummary: errorMsg.slice(0, 2000),
|
|
},
|
|
});
|
|
} catch {
|
|
// Best-effort status update
|
|
}
|
|
}
|
|
throw error;
|
|
} finally {
|
|
await cwPrisma.$disconnect();
|
|
await apiPrisma.$disconnect();
|
|
}
|
|
};
|
|
|
|
export const executeForcedIncrementalDalpuriSync = async (options?: {
|
|
jobRunId?: string;
|
|
}): Promise<void> => {
|
|
return executeFullDalpuriSync({
|
|
forceIncremental: true,
|
|
logAllDifferences: false,
|
|
jobRunId: options?.jobRunId,
|
|
});
|
|
};
|
|
|
|
if (import.meta.main) {
|
|
executeFullDalpuriSync()
|
|
.then(() => {
|
|
process.exit(0);
|
|
})
|
|
.catch((error) => {
|
|
console.error("CW -> API sync failed:", error);
|
|
process.exit(1);
|
|
});
|
|
}
|