Compare commits

..

4 Commits

Author SHA1 Message Date
HoloPanio 0594816ea4 fix(api): include pdfmake Roboto fonts in runtime image 2026-04-10 03:00:31 +00:00
HoloPanio 71fe36c0b8 fix(worker): restore reliable 5s incremental sync cadence 2026-04-10 01:00:04 +00:00
HoloPanio e0d575454e fix(dalpuri): sync CW Members before Users to resolve FK ordering issue
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing
Users first caused all 140 User upserts to fail since the CwMember table was
empty. This cascade failure then caused all Opportunity upserts to fail because
Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier.

Fix: reorder steps so CW Members syncs first, then Users.
2026-04-09 01:04:00 +00:00
HoloPanio 32bba31e72 fix(dalpuri): populate locationId and fix closedFlag on opportunities
- Add ownerLevelRecId -> locationId mapping to opportunity translation
- Include soOppStatus in opportunity query and derive closedFlag from
  status.closedFlag (with fallback to legacy oldCloseFlag field)
- Add locationId sanitization guard in both sync.ts and sync-by-table.ts

Note: departmentId is not available in CW SO_Opportunity table and
remains null for synced records.
2026-04-09 00:22:41 +00:00
9 changed files with 112 additions and 15 deletions
+3
View File
@@ -90,6 +90,9 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
# Copy production node_modules (Prisma adapter needs native bindings) # Copy production node_modules (Prisma adapter needs native bindings)
COPY --from=deps /app/node_modules/ ./node_modules/ COPY --from=deps /app/node_modules/ ./node_modules/
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
COPY --from=build /app/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
ENV NODE_ENV=production ENV NODE_ENV=production
# ---- Stage 4: API server runtime image ---- # ---- Stage 4: API server runtime image ----
+21 -2
View File
@@ -1,5 +1,5 @@
import PdfPrinter from "pdfmake/src/Printer"; import PdfPrinter from "pdfmake/src/Printer";
import { readFileSync } from "node:fs"; import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
export interface QuoteLineItem { export interface QuoteLineItem {
@@ -110,7 +110,26 @@ const COMPANY = {
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png"); const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"); function resolveRobotoFontDir(): string {
const candidates = [
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
];
for (const dir of candidates) {
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
return dir;
}
}
throw new Error(
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
);
}
const fontDir = resolveRobotoFontDir();
const fonts = { const fonts = {
Roboto: { Roboto: {
normal: join(fontDir, "Roboto-Regular.ttf"), normal: join(fontDir, "Roboto-Regular.ttf"),
+1
View File
@@ -1,6 +1,7 @@
import { Server } from "socket.io"; import { Server } from "socket.io";
import { events, EventTypes } from "../globalEvents"; import { events, EventTypes } from "../globalEvents";
import { WorkerQueue } from "./queues"; import { WorkerQueue } from "./queues";
import { reserveWorkerId } from "../../workert";
function emitGlobalEvent<K extends keyof EventTypes>( function emitGlobalEvent<K extends keyof EventTypes>(
name: K, name: K,
+13 -1
View File
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
* Called on an interval from the main API process so it survives worker restarts. * Called on an interval from the main API process so it survives worker restarts.
*/ */
export async function enqueueIncrementalSync(): Promise<void> { export async function enqueueIncrementalSync(): Promise<void> {
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {}); const jobId = await getBoss().send(
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
{
enqueuedAt: new Date().toISOString(),
},
{
singletonKey: "dalpuri-incremental-sync",
}
);
if (!jobId) {
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
}
} }
+3 -1
View File
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
queueType: WorkerQueue, queueType: WorkerQueue,
workFn: (workerSocket: Socket) => Promise<T>, workFn: (workerSocket: Socket) => Promise<T>,
): Promise<T> { ): Promise<T> {
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Request a worker ID and namespace from the manager // Request a worker ID and namespace from the manager
socket.emit( socket.emit(
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
} }
// Connect to the worker-specific namespace // Connect to the worker-specific namespace
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, { const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
reconnection: false, reconnection: false,
}); });
+29 -1
View File
@@ -163,6 +163,7 @@ if (import.meta.main) {
// Register job handler for DALPURI_FULL_SYNC // Register job handler for DALPURI_FULL_SYNC
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
const socket = await ensureManagerSocketReady(); const socket = await ensureManagerSocketReady();
await enqueueDalpuriFullSync(socket); await enqueueDalpuriFullSync(socket);
@@ -170,10 +171,37 @@ if (import.meta.main) {
console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
await executeIncrementalSync(); const startedAt = Date.now();
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
try {
await executeIncrementalSync();
console.log(
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
);
} catch (err) {
console.error(
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
err
);
throw err;
}
}); });
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
const enqueueIncrementalWithLogging = () => {
enqueueIncrementalSync().catch((err) => {
console.error(
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
);
});
};
// Keep a worker-local 5s scheduler so incremental sync continues even when
// API interval scheduling is unavailable.
enqueueIncrementalWithLogging();
setInterval(enqueueIncrementalWithLogging, 5_000);
console.log("[worker] Started 5-second incremental enqueue interval");
// Register job handler for REFRESH_SALES_METRICS // Register job handler for REFRESH_SALES_METRICS
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => { await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
+11
View File
@@ -442,6 +442,12 @@ const sanitizeModelData = (
) { ) {
sanitized.statusId = null; sanitized.statusId = null;
} }
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
} }
if (targetModel === "schedule") { if (targetModel === "schedule") {
@@ -750,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
+21 -9
View File
@@ -652,6 +652,13 @@ const sanitizeModelData = (
) { ) {
sanitized.stageId = null; sanitized.stageId = null;
} }
// Nullify locationId if the corporate location doesn't exist
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
// Nullify taxCodeId if the tax code hasn't synced yet // Nullify taxCodeId if the tax code hasn't synced yet
if ( if (
sanitized.taxCodeId != null && sanitized.taxCodeId != null &&
@@ -1344,6 +1351,15 @@ export const executeFullDalpuriSync = async (options?: {
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs; const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [ const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Users", name: "Users",
sourceModel: "member", sourceModel: "member",
@@ -1358,15 +1374,6 @@ export const executeFullDalpuriSync = async (options?: {
}, },
}, },
}, },
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Companies", name: "Companies",
sourceModel: "company", sourceModel: "company",
@@ -1601,6 +1608,11 @@ export const executeFullDalpuriSync = async (options?: {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
+10 -1
View File
@@ -1,6 +1,7 @@
import { import {
Opportunity as CwOpportunity, Opportunity as CwOpportunity,
OpportunityMember as CwOpportunityMember, OpportunityMember as CwOpportunityMember,
SoOppStatus as CwSoOppStatus,
} from "../../generated/prisma/client"; } from "../../generated/prisma/client";
import { OpportunityInterest } from "../../../api/generated/prisma/client"; import { OpportunityInterest } from "../../../api/generated/prisma/client";
import { Translation, skipRow } from "./types"; import { Translation, skipRow } from "./types";
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
dateBecameLead?: Date | null; dateBecameLead?: Date | null;
closedDate?: Date | null; closedDate?: Date | null;
closedFlag: boolean; closedFlag: boolean;
locationId?: number | null;
closedById?: string | null; closedById?: string | null;
updatedBy: string; updatedBy: string;
eneteredBy: string; eneteredBy: string;
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
CwOpportunityMember, CwOpportunityMember,
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag" "memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
>[]; >[];
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
}; };
const toInterest = (value: number | null): OpportunityInterest | null => { const toInterest = (value: number | null): OpportunityInterest | null => {
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
}, },
{ from: "companyRecId", to: "companyId" }, { from: "companyRecId", to: "companyId" },
{ from: "contactRecId", to: "contactId" }, { from: "contactRecId", to: "contactId" },
{ from: "ownerLevelRecId", to: "locationId" },
{ from: "companyAddressRecId", to: "siteId" }, { from: "companyAddressRecId", to: "siteId" },
{ from: "poNumber", to: "customerPO" }, { from: "poNumber", to: "customerPO" },
{ from: "dateCloseExpected", to: "expectedCloseDate" }, { from: "dateCloseExpected", to: "expectedCloseDate" },
{ from: "datePipelineChange", to: "pipelineChangeDate" }, { from: "datePipelineChange", to: "pipelineChangeDate" },
{ from: "dateBecameLead", to: "dateBecameLead" }, { from: "dateBecameLead", to: "dateBecameLead" },
{ from: "dateClosed", to: "closedDate" }, { from: "dateClosed", to: "closedDate" },
{ from: "oldCloseFlag", to: "closedFlag" }, {
from: "oldCloseFlag",
to: "closedFlag",
process: (_value, _context, row) =>
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
},
{ from: "closedBy", to: "closedById" }, { from: "closedBy", to: "closedById" },
{ {
from: "updatedBy", from: "updatedBy",