Compare commits

...

8 Commits

Author SHA1 Message Date
HoloPanio 86d7426e8b fix(sync): harden incremental observability and periodic reconciliation 2026-04-10 04:36:36 +00:00
HoloPanio afe56393e7 fix(sync): restore worker incremental API DB resolution 2026-04-10 04:07:27 +00:00
HoloPanio b2cd26af30 fix(release): unblock deploy workflow image build and desktop rebuild 2026-04-10 03:44:33 +00:00
HoloPanio 0594816ea4 fix(api): include pdfmake Roboto fonts in runtime image 2026-04-10 03:00:31 +00:00
HoloPanio 71fe36c0b8 fix(worker): restore reliable 5s incremental sync cadence 2026-04-10 01:00:04 +00:00
HoloPanio e0d575454e fix(dalpuri): sync CW Members before Users to resolve FK ordering issue
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing
Users first caused all 140 User upserts to fail since the CwMember table was
empty. This cascade failure then caused all Opportunity upserts to fail because
Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier.

Fix: reorder steps so CW Members syncs first, then Users.
2026-04-09 01:04:00 +00:00
HoloPanio 32bba31e72 fix(dalpuri): populate locationId and fix closedFlag on opportunities
- Add ownerLevelRecId -> locationId mapping to opportunity translation
- Include soOppStatus in opportunity query and derive closedFlag from
  status.closedFlag (with fallback to legacy oldCloseFlag field)
- Add locationId sanitization guard in both sync.ts and sync-by-table.ts

Note: departmentId is not available in CW SO_Opportunity table and
remains null for synced records.
2026-04-09 00:22:41 +00:00
HoloPanio 1233535b20 fix(dalpuri): populate userIdentifiersByMemberRecId from CwMember table
When no User accounts have cwMemberId linked, the context map was empty
and all opportunities got primarySalesRepId = null. Now also populate
the map from CwMember rows directly (User-linked entries take precedence),
so rep identifiers resolve correctly regardless of user account linkage.
2026-04-08 23:23:51 +00:00
12 changed files with 276 additions and 24 deletions
+4 -2
View File
@@ -231,9 +231,10 @@ jobs:
run: bun install --frozen-lockfile run: bun install --frozen-lockfile
- name: Rebuild native modules - name: Rebuild native modules
run: npm rebuild run: npm rebuild --ignore-scripts
env: env:
HUSKY: "0" HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build macOS distributables - name: Build macOS distributables
run: bun run make:macos run: bun run make:macos
@@ -272,9 +273,10 @@ jobs:
run: bun install --frozen-lockfile run: bun install --frozen-lockfile
- name: Rebuild native modules - name: Rebuild native modules
run: npm rebuild run: npm rebuild --ignore-scripts
env: env:
HUSKY: "0" HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build Windows distributables - name: Build Windows distributables
run: bun run make -- --platform win32 run: bun run make -- --platform win32
+3
View File
@@ -90,6 +90,9 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
# Copy production node_modules (Prisma adapter needs native bindings) # Copy production node_modules (Prisma adapter needs native bindings)
COPY --from=deps /app/node_modules/ ./node_modules/ COPY --from=deps /app/node_modules/ ./node_modules/
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
ENV NODE_ENV=production ENV NODE_ENV=production
# ---- Stage 4: API server runtime image ---- # ---- Stage 4: API server runtime image ----
+5
View File
@@ -20,6 +20,11 @@ spec:
env: env:
- name: MANAGER_SOCKET_URL - name: MANAGER_SOCKET_URL
value: "http://optima-api.optima.svc.cluster.local:8671" value: "http://optima-api.optima.svc.cluster.local:8671"
- name: API_DATABASE_URL
valueFrom:
secretKeyRef:
name: api-env-secret
key: DATABASE_URL
envFrom: envFrom:
- secretRef: - secretRef:
name: api-env-secret name: api-env-secret
+21 -2
View File
@@ -1,5 +1,5 @@
import PdfPrinter from "pdfmake/src/Printer"; import PdfPrinter from "pdfmake/src/Printer";
import { readFileSync } from "node:fs"; import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
export interface QuoteLineItem { export interface QuoteLineItem {
@@ -110,7 +110,26 @@ const COMPANY = {
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png"); const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"); function resolveRobotoFontDir(): string {
const candidates = [
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
];
for (const dir of candidates) {
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
return dir;
}
}
throw new Error(
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
);
}
const fontDir = resolveRobotoFontDir();
const fonts = { const fonts = {
Roboto: { Roboto: {
normal: join(fontDir, "Roboto-Regular.ttf"), normal: join(fontDir, "Roboto-Regular.ttf"),
+1
View File
@@ -1,6 +1,7 @@
import { Server } from "socket.io"; import { Server } from "socket.io";
import { events, EventTypes } from "../globalEvents"; import { events, EventTypes } from "../globalEvents";
import { WorkerQueue } from "./queues"; import { WorkerQueue } from "./queues";
import { reserveWorkerId } from "../../workert";
function emitGlobalEvent<K extends keyof EventTypes>( function emitGlobalEvent<K extends keyof EventTypes>(
name: K, name: K,
+50 -1
View File
@@ -1,5 +1,6 @@
import { Socket } from "socket.io-client"; import { Socket } from "socket.io-client";
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri"; import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
import { prisma } from "../../constants";
/** /**
* Execute a full sync from Dalpuri (ConnectWise) to the API database. * Execute a full sync from Dalpuri (ConnectWise) to the API database.
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
* Called every 5 seconds via PgBoss from the API process interval. * Called every 5 seconds via PgBoss from the API process interval.
*/ */
export async function executeIncrementalSync(): Promise<void> { export async function executeIncrementalSync(): Promise<void> {
return executeForcedIncrementalDalpuriSync(); let jobRunId: string | undefined;
try {
const run = await prisma.syncJobRun.create({
data: {
jobType: "INCREMENTAL_SYNC",
status: "RUNNING",
triggeredBy: "worker",
startedAt: new Date(),
},
select: { id: true },
});
jobRunId = run.id;
} catch (err) {
// Sync should still run even if tracking insert fails.
console.error("[sync] Failed to create incremental SyncJobRun", err);
}
try {
await executeForcedIncrementalDalpuriSync({ jobRunId });
if (jobRunId) {
await prisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
}
} catch (err) {
if (jobRunId) {
const errorSummary = err instanceof Error ? err.message : String(err);
await prisma.syncJobRun
.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorSummary.slice(0, 2000),
},
})
.catch(() => {
// Best-effort update only.
});
}
throw err;
}
} }
+13 -1
View File
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
* Called on an interval from the main API process so it survives worker restarts. * Called on an interval from the main API process so it survives worker restarts.
*/ */
export async function enqueueIncrementalSync(): Promise<void> { export async function enqueueIncrementalSync(): Promise<void> {
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {}); const jobId = await getBoss().send(
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
{
enqueuedAt: new Date().toISOString(),
},
{
singletonKey: "dalpuri-incremental-sync",
}
);
if (!jobId) {
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
}
} }
+3 -1
View File
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
queueType: WorkerQueue, queueType: WorkerQueue,
workFn: (workerSocket: Socket) => Promise<T>, workFn: (workerSocket: Socket) => Promise<T>,
): Promise<T> { ): Promise<T> {
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Request a worker ID and namespace from the manager // Request a worker ID and namespace from the manager
socket.emit( socket.emit(
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
} }
// Connect to the worker-specific namespace // Connect to the worker-specific namespace
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, { const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
reconnection: false, reconnection: false,
}); });
+29 -1
View File
@@ -163,6 +163,7 @@ if (import.meta.main) {
// Register job handler for DALPURI_FULL_SYNC // Register job handler for DALPURI_FULL_SYNC
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
const socket = await ensureManagerSocketReady(); const socket = await ensureManagerSocketReady();
await enqueueDalpuriFullSync(socket); await enqueueDalpuriFullSync(socket);
@@ -170,10 +171,37 @@ if (import.meta.main) {
console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
await executeIncrementalSync(); const startedAt = Date.now();
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
try {
await executeIncrementalSync();
console.log(
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
);
} catch (err) {
console.error(
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
err
);
throw err;
}
}); });
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
const enqueueIncrementalWithLogging = () => {
enqueueIncrementalSync().catch((err) => {
console.error(
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
);
});
};
// Keep a worker-local 5s scheduler so incremental sync continues even when
// API interval scheduling is unavailable.
enqueueIncrementalWithLogging();
setInterval(enqueueIncrementalWithLogging, 5_000);
console.log("[worker] Started 5-second incremental enqueue interval");
// Register job handler for REFRESH_SALES_METRICS // Register job handler for REFRESH_SALES_METRICS
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => { await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
+27
View File
@@ -294,6 +294,22 @@ const refreshContextFromApi = async (
} }
} }
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) { for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid); context.serviceTicketBoardUidsById.set(board.id, board.uid);
} }
@@ -426,6 +442,12 @@ const sanitizeModelData = (
) { ) {
sanitized.statusId = null; sanitized.statusId = null;
} }
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
} }
if (targetModel === "schedule") { if (targetModel === "schedule") {
@@ -734,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
+110 -15
View File
@@ -75,6 +75,43 @@ type DeleteResult = {
let incrementalDeleteStepIndex = 0; let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
10
) || 60
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const lastCriticalFullSyncByStep = new Map<string, number>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const parseEnvFile = (path: string): Record<string, string> => { const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8"); const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {}; const out: Record<string, string> = {};
@@ -107,6 +144,20 @@ const resolveApiDatabaseUrl = (): string => {
if (process.env.OPTIMA_API_DATABASE_URL) if (process.env.OPTIMA_API_DATABASE_URL)
return process.env.OPTIMA_API_DATABASE_URL; return process.env.OPTIMA_API_DATABASE_URL;
// Worker/runtime fallback:
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
return process.env.DATABASE_URL;
}
if (
process.env.DATABASE_URL &&
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
) {
return process.env.DATABASE_URL;
}
const candidates = [ const candidates = [
resolve(import.meta.dir, "../../api/.env"), resolve(import.meta.dir, "../../api/.env"),
resolve(process.cwd(), "../api/.env"), resolve(process.cwd(), "../api/.env"),
@@ -323,6 +374,22 @@ const refreshContextFromApi = async (
} }
} }
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) { for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid); context.serviceTicketBoardUidsById.set(board.id, board.uid);
} }
@@ -636,6 +703,13 @@ const sanitizeModelData = (
) { ) {
sanitized.stageId = null; sanitized.stageId = null;
} }
// Nullify locationId if the corporate location doesn't exist
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
// Nullify taxCodeId if the tax code hasn't synced yet // Nullify taxCodeId if the tax code hasn't synced yet
if ( if (
sanitized.taxCodeId != null && sanitized.taxCodeId != null &&
@@ -1328,6 +1402,15 @@ export const executeFullDalpuriSync = async (options?: {
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs; const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [ const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Users", name: "Users",
sourceModel: "member", sourceModel: "member",
@@ -1342,15 +1425,6 @@ export const executeFullDalpuriSync = async (options?: {
}, },
}, },
}, },
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Companies", name: "Companies",
sourceModel: "company", sourceModel: "company",
@@ -1585,6 +1659,11 @@ export const executeFullDalpuriSync = async (options?: {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
@@ -1729,19 +1808,35 @@ export const executeFullDalpuriSync = async (options?: {
step, step,
forceIncremental forceIncremental
); );
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter = const sourceIdsFilter =
decision.mode === "incremental" ? decision.sourceIds : undefined; effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log( console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${ ` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
decision.mode effectiveDecision.mode
}${ }${
decision.mode === "incremental" effectiveDecision.mode === "incremental"
? ` (${decision.sourceIds.length} ids)` ? ` (${effectiveDecision.sourceIds.length} ids)`
: "" : ""
}` }`
); );
if (logAllDifferences) { if (logAllDifferences) {
logAllSmartSyncDifferences(step, decision.differences); logAllSmartSyncDifferences(step, effectiveDecision.differences);
} }
const result = await syncStep( const result = await syncStep(
cwPrisma, cwPrisma,
@@ -1763,7 +1858,7 @@ export const executeFullDalpuriSync = async (options?: {
await writeStepLog( await writeStepLog(
step.name, step.name,
decision.mode, effectiveDecision.mode,
result, result,
{ deleted: 0, failed: 0 }, { deleted: 0, failed: 0 },
Date.now() - stepStart Date.now() - stepStart
+10 -1
View File
@@ -1,6 +1,7 @@
import { import {
Opportunity as CwOpportunity, Opportunity as CwOpportunity,
OpportunityMember as CwOpportunityMember, OpportunityMember as CwOpportunityMember,
SoOppStatus as CwSoOppStatus,
} from "../../generated/prisma/client"; } from "../../generated/prisma/client";
import { OpportunityInterest } from "../../../api/generated/prisma/client"; import { OpportunityInterest } from "../../../api/generated/prisma/client";
import { Translation, skipRow } from "./types"; import { Translation, skipRow } from "./types";
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
dateBecameLead?: Date | null; dateBecameLead?: Date | null;
closedDate?: Date | null; closedDate?: Date | null;
closedFlag: boolean; closedFlag: boolean;
locationId?: number | null;
closedById?: string | null; closedById?: string | null;
updatedBy: string; updatedBy: string;
eneteredBy: string; eneteredBy: string;
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
CwOpportunityMember, CwOpportunityMember,
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag" "memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
>[]; >[];
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
}; };
const toInterest = (value: number | null): OpportunityInterest | null => { const toInterest = (value: number | null): OpportunityInterest | null => {
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
}, },
{ from: "companyRecId", to: "companyId" }, { from: "companyRecId", to: "companyId" },
{ from: "contactRecId", to: "contactId" }, { from: "contactRecId", to: "contactId" },
{ from: "ownerLevelRecId", to: "locationId" },
{ from: "companyAddressRecId", to: "siteId" }, { from: "companyAddressRecId", to: "siteId" },
{ from: "poNumber", to: "customerPO" }, { from: "poNumber", to: "customerPO" },
{ from: "dateCloseExpected", to: "expectedCloseDate" }, { from: "dateCloseExpected", to: "expectedCloseDate" },
{ from: "datePipelineChange", to: "pipelineChangeDate" }, { from: "datePipelineChange", to: "pipelineChangeDate" },
{ from: "dateBecameLead", to: "dateBecameLead" }, { from: "dateBecameLead", to: "dateBecameLead" },
{ from: "dateClosed", to: "closedDate" }, { from: "dateClosed", to: "closedDate" },
{ from: "oldCloseFlag", to: "closedFlag" }, {
from: "oldCloseFlag",
to: "closedFlag",
process: (_value, _context, row) =>
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
},
{ from: "closedBy", to: "closedById" }, { from: "closedBy", to: "closedById" },
{ {
from: "updatedBy", from: "updatedBy",