Compare commits

...

28 Commits

Author SHA1 Message Date
HoloPanio a8c48e8c75 fix: correct prisma client import path in setup-admin 2026-04-14 03:18:58 +00:00
HoloPanio 051edb5f78 chore: add setup-admin script 2026-04-14 03:06:39 +00:00
HoloPanio f87f6dd336 chore: add setup-admin dockerfile stage 2026-04-14 02:52:51 +00:00
HoloPanio 2eb387811d fix(worker): break circular import by extracting PgBoss singleton
incremental-sync.ts and api/cw/sync.ts imported getBoss() from workert.ts.
When workert.ts (the entry point) dynamically imported incremental-sync.ts,
it triggered a circular module re-evaluation that hung indefinitely.

Extract the PgBoss singleton and getBoss() factory to a new boss-instance.ts
module that neither has top-level async side-effects nor imports from
workert.ts. All consumers (workert.ts, index.ts, incremental-sync.ts,
cw/sync.ts) now import from boss-instance.ts instead.
2026-04-14 00:34:33 +00:00
HoloPanio db27c9224d fix(worker): add granular debug logging to isolate startup hang
Add console.log before/after each createQueue() call and dynamic
import to pinpoint exactly where the worker startup is blocking.
2026-04-14 00:12:20 +00:00
HoloPanio 7f6e6fdfbc fix(worker): add PgBoss startup timeouts and debug logging
- Add statement_timeout=30000ms to PgBoss connection URL to prevent
  SQL queries from hanging indefinitely
- Add connectionTimeoutMillis=15s to PgBoss config for connection timeout
- Wrap boss.start() in 30s Promise.race timeout with process.exit(1)
  on failure to ensure container restarts instead of hanging silently
- Add debug logging around PgBoss startup to diagnose connection issues
2026-04-13 23:53:32 +00:00
HoloPanio 5f5f610060 fix: remove prisma/config import; use plain export in prisma.config.ts 2026-04-13 21:35:34 +00:00
HoloPanio 809841d672 fix: add url = env(DATABASE_URL) to prisma schema datasource 2026-04-13 21:31:43 +00:00
HoloPanio 276eb563bf fix: remove prisma.config.ts from runtime image (use defaults) 2026-04-13 21:26:40 +00:00
HoloPanio 7624ba0bc0 fix: add bunx symlink to runtime Docker image 2026-04-13 21:18:01 +00:00
HoloPanio 1063231107 chore: update bun.lock (@types/bun 1.3.11 -> 1.3.12) 2026-04-13 21:11:25 +00:00
Jackson 2cd5dee612 Merge pull request #3 from HorizonStackSoftware/copilot/remove-prisma-script
Replace migrate-entrypoint.sh with direct Prisma commands in Dockerfile
2026-04-12 10:37:33 -05:00
copilot-swe-agent[bot] 8ac1cbaf3e chore: replace migrate-entrypoint.sh with direct prisma commands in Dockerfile
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/eb8e2182-3a0d-4a9c-ad4f-4d1d9cf8a923

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 15:34:53 +00:00
Jackson bd7e6a37cd Merge pull request #2 from HorizonStackSoftware/copilot/remove-frozen-lockfile-params
Remove --frozen-lockfile from test workflows
2026-04-12 09:44:53 -05:00
copilot-swe-agent[bot] 4e0799f9d9 Remove --frozen-lockfile from test workflow files
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/8b3e4db9-a1bf-44c4-98fc-3304890cb3f4

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 14:41:14 +00:00
Jackson 223a06ba27 Merge pull request #1 from HorizonStackSoftware/copilot/add-post-build-command-for-migration
Run Prisma migrations automatically on API container startup
2026-04-12 09:38:45 -05:00
copilot-swe-agent[bot] 503657d168 feat: run prisma migrate deploy on api container startup
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/509d6156-c474-457b-9627-82f7b2f13158

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 14:34:58 +00:00
Jackson cf68e281e8 Update bun install commands in Dockerfile
Removed the --frozen-lockfile option from bun install commands in the Dockerfile.
2026-04-12 09:21:27 -05:00
HoloPanio 57b5763d41 fix(opportunity): remove synthetic Contact suffix in contact field 2026-04-10 05:09:40 +00:00
HoloPanio 2bd498a35d fix(sync): use CW watermark incremental path for critical tables 2026-04-10 04:53:57 +00:00
HoloPanio 86d7426e8b fix(sync): harden incremental observability and periodic reconciliation 2026-04-10 04:36:36 +00:00
HoloPanio afe56393e7 fix(sync): restore worker incremental API DB resolution 2026-04-10 04:07:27 +00:00
HoloPanio b2cd26af30 fix(release): unblock deploy workflow image build and desktop rebuild 2026-04-10 03:44:33 +00:00
HoloPanio 0594816ea4 fix(api): include pdfmake Roboto fonts in runtime image 2026-04-10 03:00:31 +00:00
HoloPanio 71fe36c0b8 fix(worker): restore reliable 5s incremental sync cadence 2026-04-10 01:00:04 +00:00
HoloPanio e0d575454e fix(dalpuri): sync CW Members before Users to resolve FK ordering issue
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing
Users first caused all 140 User upserts to fail since the CwMember table was
empty. This cascade failure then caused all Opportunity upserts to fail because
Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier.

Fix: reorder steps so CW Members syncs first, then Users.
2026-04-09 01:04:00 +00:00
HoloPanio 32bba31e72 fix(dalpuri): populate locationId and fix closedFlag on opportunities
- Add ownerLevelRecId -> locationId mapping to opportunity translation
- Include soOppStatus in opportunity query and derive closedFlag from
  status.closedFlag (with fallback to legacy oldCloseFlag field)
- Add locationId sanitization guard in both sync.ts and sync-by-table.ts

Note: departmentId is not available in CW SO_Opportunity table and
remains null for synced records.
2026-04-09 00:22:41 +00:00
HoloPanio 1233535b20 fix(dalpuri): populate userIdentifiersByMemberRecId from CwMember table
When no User accounts have cwMemberId linked, the context map was empty
and all opportunities got primarySalesRepId = null. Now also populate
the map from CwMember rows directly (User-linked entries take precedence),
so rep identifiers resolve correctly regardless of user account linkage.
2026-04-08 23:23:51 +00:00
23 changed files with 563 additions and 71 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ jobs:
bun-version: "1.3.6"
- name: Install dependencies
run: bun install --frozen-lockfile
run: bun install
- name: Generate API Prisma client
run: DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
+1 -1
View File
@@ -18,7 +18,7 @@ jobs:
bun-version: "1.3.6"
- name: Install dependencies
run: bun install --frozen-lockfile
run: bun install
- name: Generate Dalpuri Prisma client (CW MSSQL)
run: DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" bunx prisma generate
+4 -2
View File
@@ -231,9 +231,10 @@ jobs:
run: bun install --frozen-lockfile
- name: Rebuild native modules
run: npm rebuild
run: npm rebuild --ignore-scripts
env:
HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build macOS distributables
run: bun run make:macos
@@ -272,9 +273,10 @@ jobs:
run: bun install --frozen-lockfile
- name: Rebuild native modules
run: npm rebuild
run: npm rebuild --ignore-scripts
env:
HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build Windows distributables
run: bun run make -- --platform win32
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
bun-version: "1.3.11"
- name: Install dependencies
run: bun install --frozen-lockfile
run: bun install
- name: Run unit tests
run: bun run test:unit -- --run
+34 -8
View File
@@ -17,7 +17,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json
COPY patches ./patches
RUN bun install --frozen-lockfile --production
RUN bun install --production
# ---- Stage 2: Build ----
FROM oven/bun:1.3.11 AS build
@@ -32,7 +32,7 @@ COPY ui/package.json ./ui/package.json
COPY patches ./patches
# Install all deps (including dev) for the full workspace
RUN bun install --frozen-lockfile
RUN bun install
# Copy API source and config
COPY api/src/ ./api/src/
@@ -90,6 +90,13 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
# Copy production node_modules (Prisma adapter needs native bindings)
COPY --from=deps /app/node_modules/ ./node_modules/
# Copy bun so prisma migrate deploy can run at container startup
COPY --from=build /usr/local/bin/bun /usr/local/bin/bun
RUN ln -s /usr/local/bin/bun /usr/local/bin/bunx
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
ENV NODE_ENV=production
# ---- Stage 4: API server runtime image ----
@@ -101,7 +108,7 @@ COPY --from=build /app/api/logo.png ./logo.png
COPY --from=build /app/api/src/modules/sales-utils/salesTaxRates.json ./salesTaxRates.json
EXPOSE 3000
CMD ["./server"]
CMD ["sh", "-c", "bunx prisma migrate deploy && ./server"]
# ---- Stage 5: Worker runtime image ----
FROM runtime-base AS worker
@@ -125,15 +132,13 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json
COPY patches ./patches
RUN bun install --frozen-lockfile
RUN bun install
COPY api/prisma/ ./api/prisma/
COPY api/prisma.config.ts ./api/prisma.config.ts
RUN chmod +x /app/api/prisma/migrate-entrypoint.sh
WORKDIR /app/api
CMD ["sh", "prisma/migrate-entrypoint.sh"]
CMD ["bunx", "prisma", "migrate", "deploy"]
# ---- Stage 7: Dalpuri CW-to-API sync runner ----
FROM oven/bun:1.3.11 AS dalpuri-sync
@@ -146,7 +151,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json
COPY patches ./patches
RUN bun install --frozen-lockfile
RUN bun install
COPY dalpuri/src/ ./dalpuri/src/
COPY dalpuri/prisma/ ./dalpuri/prisma/
@@ -164,3 +169,24 @@ RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma gen
WORKDIR /app/dalpuri
CMD ["bun", "run", "src/sync.ts"]
FROM oven/bun:1.3.11 AS setup-admin
WORKDIR /app
COPY package.json bun.lock ./
COPY api/package.json ./api/package.json
COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json
COPY patches ./patches
RUN bun install
COPY api/prisma/ ./api/prisma/
COPY api/prisma.config.ts ./api/prisma.config.ts
COPY api/setup-admin.ts ./api/setup-admin.ts
WORKDIR /app/api
RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
CMD ["bun", "run", "setup-admin.ts"]
+5
View File
@@ -20,6 +20,11 @@ spec:
env:
- name: MANAGER_SOCKET_URL
value: "http://optima-api.optima.svc.cluster.local:8671"
- name: API_DATABASE_URL
valueFrom:
secretKeyRef:
name: api-env-secret
key: DATABASE_URL
envFrom:
- secretRef:
name: api-env-secret
+3 -5
View File
@@ -1,11 +1,9 @@
import { defineConfig, env } from 'prisma/config'
export default defineConfig({
export default {
schema: 'prisma/schema.prisma',
migrations: {
path: 'prisma/migrations',
},
datasource: {
url: env('DATABASE_URL'),
url: process.env.DATABASE_URL,
},
})
}
+66
View File
@@ -0,0 +1,66 @@
import { PrismaClient } from './generated/prisma/client';
const prisma = new PrismaClient();
async function main() {
try {
// Create the admin role if it doesn't exist
const adminRole = await prisma.role.upsert({
where: { moniker: 'admin' },
update: {},
create: {
title: 'Administrator',
moniker: 'admin',
permissions: JSON.stringify({
// Full permissions for admin
'*': true,
}),
},
});
console.log('✓ Admin role created/verified:', adminRole);
// Find the user with jackson.roberts@totaltech.net
const user = await prisma.user.findUnique({
where: { email: 'jackson.roberts@totaltech.net' },
include: { roles: true },
});
if (!user) {
console.error(
'✗ User jackson.roberts@totaltech.net not found. Please ensure the user exists.',
);
process.exit(1);
}
console.log('✓ User found:', user.email);
// Check if user already has admin role
const hasAdminRole = user.roles.some((r) => r.moniker === 'admin');
if (hasAdminRole) {
console.log('✓ User already has admin role');
} else {
// Assign admin role to user
const updatedUser = await prisma.user.update({
where: { id: user.id },
data: {
roles: {
connect: { id: adminRole.id },
},
},
include: { roles: true },
});
console.log('✓ Admin role assigned to jackson.roberts@totaltech.net');
console.log('✓ User roles:', updatedUser.roles.map((r) => r.moniker));
}
} catch (error) {
console.error('✗ Error:', error);
process.exit(1);
} finally {
await prisma.$disconnect();
}
}
main();
+1 -1
View File
@@ -2,7 +2,7 @@ import { createRoute } from "../../modules/api-utils/createRoute";
import { apiResponse } from "../../modules/api-utils/apiResponse";
import { ContentfulStatusCode } from "hono/utils/http-status";
import { authMiddleware } from "../middleware/authorization";
import { getBoss } from "../../workert";
import { getBoss } from "../../boss-instance";
import { WorkerQueue } from "../../modules/workers/queues";
/* POST /v1/cw/sync/full */
+30
View File
@@ -0,0 +1,30 @@
/**
* Shared PgBoss singleton — kept in its own module to break circular imports
* between workert.ts and the worker modules that call getBoss().
*/
import { PgBoss } from "pg-boss";
function makePgBossUrl(rawUrl: string): string {
try {
const u = new URL(rawUrl);
// 30-second statement timeout to prevent individual SQL queries from
// hanging indefinitely if the DB server stops responding mid-query.
u.searchParams.set("options", "-c statement_timeout=30000");
return u.toString();
} catch {
return rawUrl;
}
}
export const boss = new PgBoss({
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
connectionTimeoutMillis: 15_000,
});
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
});
export function getBoss(): PgBoss {
return boss;
}
+16 -2
View File
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
return null;
}
function formatOpportunityContactName(
firstName?: string | null,
lastName?: string | null
): string {
const first = (firstName ?? "").trim();
const last = (lastName ?? "").trim();
if (first && last.toLowerCase() === "contact") {
return first;
}
return `${first} ${last}`.trim();
}
/**
* Opportunity Controller
*
@@ -290,7 +304,7 @@ export class OpportunityController {
| null
| undefined;
this.contactName = (data as any).contactName ?? (contactRel
? `${contactRel.firstName} ${contactRel.lastName}`.trim()
? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
: null);
// Site
@@ -674,7 +688,7 @@ export class OpportunityController {
id: contact.id,
contact: {
id: contact.id,
name: `${contact.firstName} ${contact.lastName}`.trim(),
name: formatOpportunityContactName(contact.firstName, contact.lastName),
},
company: contact.company
? {
+2 -1
View File
@@ -6,7 +6,8 @@ import { events } from "./modules/globalEvents";
import { setupEventDebugger } from "./modules/logging/eventDebugger";
import { signPermissions } from "./modules/permission-utils/signPermissions";
import { RoleController } from "./controllers/RoleController";
import { initializeWorkerSystem, getBoss } from "./workert";
import { initializeWorkerSystem } from "./workert";
import { getBoss } from "./boss-instance";
import { WorkerQueue } from "./modules/workers/queues";
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
import { startCommsServer } from "./modules/workers/coms";
+21 -2
View File
@@ -1,5 +1,5 @@
import PdfPrinter from "pdfmake/src/Printer";
import { readFileSync } from "node:fs";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
export interface QuoteLineItem {
@@ -110,7 +110,26 @@ const COMPANY = {
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto");
function resolveRobotoFontDir(): string {
const candidates = [
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
];
for (const dir of candidates) {
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
return dir;
}
}
throw new Error(
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
);
}
const fontDir = resolveRobotoFontDir();
const fonts = {
Roboto: {
normal: join(fontDir, "Roboto-Regular.ttf"),
+1
View File
@@ -1,6 +1,7 @@
import { Server } from "socket.io";
import { events, EventTypes } from "../globalEvents";
import { WorkerQueue } from "./queues";
import { reserveWorkerId } from "../../workert";
function emitGlobalEvent<K extends keyof EventTypes>(
name: K,
+50 -1
View File
@@ -1,5 +1,6 @@
import { Socket } from "socket.io-client";
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
import { prisma } from "../../constants";
/**
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
* Called every 5 seconds via PgBoss from the API process interval.
*/
export async function executeIncrementalSync(): Promise<void> {
return executeForcedIncrementalDalpuriSync();
let jobRunId: string | undefined;
try {
const run = await prisma.syncJobRun.create({
data: {
jobType: "INCREMENTAL_SYNC",
status: "RUNNING",
triggeredBy: "worker",
startedAt: new Date(),
},
select: { id: true },
});
jobRunId = run.id;
} catch (err) {
// Sync should still run even if tracking insert fails.
console.error("[sync] Failed to create incremental SyncJobRun", err);
}
try {
await executeForcedIncrementalDalpuriSync({ jobRunId });
if (jobRunId) {
await prisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
}
} catch (err) {
if (jobRunId) {
const errorSummary = err instanceof Error ? err.message : String(err);
await prisma.syncJobRun
.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorSummary.slice(0, 2000),
},
})
.catch(() => {
// Best-effort update only.
});
}
throw err;
}
}
+14 -2
View File
@@ -1,4 +1,4 @@
import { getBoss } from "../../workert";
import { getBoss } from "../../boss-instance";
import { WorkerQueue } from "./queues";
/**
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
* Called on an interval from the main API process so it survives worker restarts.
*/
export async function enqueueIncrementalSync(): Promise<void> {
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
const jobId = await getBoss().send(
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
{
enqueuedAt: new Date().toISOString(),
},
{
singletonKey: "dalpuri-incremental-sync",
}
);
if (!jobId) {
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
}
}
+3 -1
View File
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
queueType: WorkerQueue,
workFn: (workerSocket: Socket) => Promise<T>,
): Promise<T> {
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
return new Promise((resolve, reject) => {
// Request a worker ID and namespace from the manager
socket.emit(
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
}
// Connect to the worker-specific namespace
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
reconnection: false,
});
+60 -22
View File
@@ -1,13 +1,7 @@
import { PgBoss } from "pg-boss";
import { io, Socket } from "socket.io-client";
import { WorkerQueue } from "./modules/workers/queues";
import { setupEventDebugger } from "./modules/logging/eventDebugger";
const boss = new PgBoss(process.env.DATABASE_URL!);
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
});
import { boss, getBoss } from "./boss-instance";
let bossStartPromise: Promise<void> | null = null;
let reservationQueueReady = false;
@@ -111,19 +105,25 @@ export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
async function ensureDalpuriSyncQueue(): Promise<void> {
try {
console.log("[worker] Creating DALPURI_FULL_SYNC queue...");
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
} catch {
// Queue may already exist; ignore to keep this idempotent.
console.log("[worker] DALPURI_FULL_SYNC queue ready");
} catch (err) {
console.log("[worker] DALPURI_FULL_SYNC queue already exists (or error):", (err as Error).message);
}
try {
console.log("[worker] Creating DALPURI_INCREMENTAL_SYNC queue...");
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
} catch {
// Queue may already exist; ignore to keep this idempotent.
console.log("[worker] DALPURI_INCREMENTAL_SYNC queue ready");
} catch (err) {
console.log("[worker] DALPURI_INCREMENTAL_SYNC queue already exists (or error):", (err as Error).message);
}
try {
console.log("[worker] Creating REFRESH_SALES_METRICS queue...");
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
} catch {
// Queue may already exist; ignore to keep this idempotent.
console.log("[worker] REFRESH_SALES_METRICS queue ready");
} catch (err) {
console.log("[worker] REFRESH_SALES_METRICS queue already exists (or error):", (err as Error).message);
}
}
@@ -138,14 +138,6 @@ export async function initializeWorkerSystem(): Promise<void> {
console.log("[worker] Worker system initialized - ready for job enqueueing");
}
/**
* Get the PgBoss instance for direct job enqueueing.
* Must call initializeWorkerSystem() first.
*/
export function getBoss(): PgBoss {
return boss;
}
if (import.meta.main) {
// if (Bun.env.NODE_ENV === "development") {
// setupEventDebugger({ processLabel: "WORKER" });
@@ -155,14 +147,33 @@ if (import.meta.main) {
console.log(
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
);
console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`);
// Ensure PgBoss is connected and queues exist
await ensureBossStarted();
console.log("[worker] Starting PgBoss...");
try {
await Promise.race([
ensureBossStarted(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000)
),
]);
} catch (err) {
console.error("[worker] FATAL: PgBoss failed to start:", err);
process.exit(1);
}
console.log("[worker] PgBoss started successfully");
console.log("[worker] Ensuring sync queues...");
await ensureDalpuriSyncQueue();
console.log("[worker] Sync queues ready");
// Register job handler for DALPURI_FULL_SYNC
console.log("[worker] Importing sync-manager...");
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
console.log("[worker] Importing dalpuri-sync...");
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
console.log("[worker] Importing incremental-sync...");
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
const socket = await ensureManagerSocketReady();
await enqueueDalpuriFullSync(socket);
@@ -170,10 +181,37 @@ if (import.meta.main) {
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
const startedAt = Date.now();
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
try {
await executeIncrementalSync();
console.log(
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
);
} catch (err) {
console.error(
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
err
);
throw err;
}
});
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
const enqueueIncrementalWithLogging = () => {
enqueueIncrementalSync().catch((err) => {
console.error(
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
);
});
};
// Keep a worker-local 5s scheduler so incremental sync continues even when
// API interval scheduling is unavailable.
enqueueIncrementalWithLogging();
setInterval(enqueueIncrementalWithLogging, 5_000);
console.log("[worker] Started 5-second incremental enqueue interval");
// Register job handler for REFRESH_SALES_METRICS
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
+2 -2
View File
@@ -621,7 +621,7 @@
"@types/aria-query": ["@types/aria-query@5.0.4", "", {}, "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw=="],
"@types/bun": ["@types/bun@1.3.11", "", { "dependencies": { "bun-types": "1.3.11" } }, "sha512-5vPne5QvtpjGpsGYXiFyycfpDF2ECyPcTSsFBMa0fraoxiQyMJ3SmuQIGhzPg2WJuWxVBoxWJ2kClYTcw/4fAg=="],
"@types/bun": ["@types/bun@1.3.12", "", { "dependencies": { "bun-types": "1.3.12" } }, "sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A=="],
"@types/cacheable-request": ["@types/cacheable-request@6.0.3", "", { "dependencies": { "@types/http-cache-semantics": "*", "@types/keyv": "^3.1.4", "@types/node": "*", "@types/responselike": "^1.0.0" } }, "sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw=="],
@@ -841,7 +841,7 @@
"buffer-from": ["buffer-from@1.1.2", "", {}, "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="],
"bun-types": ["bun-types@1.3.11", "", { "dependencies": { "@types/node": "*" } }, "sha512-1KGPpoxQWl9f6wcZh57LvrPIInQMn2TQ7jsgxqpRzg+l0QPOFvJVH7HmvHo/AiPgwXy+/Thf6Ov3EdVn1vOabg=="],
"bun-types": ["bun-types@1.3.12", "", { "dependencies": { "@types/node": "*" } }, "sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA=="],
"bundle-name": ["bundle-name@4.1.0", "", { "dependencies": { "run-applescript": "^7.0.0" } }, "sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q=="],
+27
View File
@@ -294,6 +294,22 @@ const refreshContextFromApi = async (
}
}
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid);
}
@@ -426,6 +442,12 @@ const sanitizeModelData = (
) {
sanitized.statusId = null;
}
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
}
if (targetModel === "schedule") {
@@ -734,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
secondarySalesFlag: true,
},
},
soOppStatus: {
select: {
closedFlag: true,
},
},
},
},
},
+208 -15
View File
@@ -75,6 +75,136 @@ type DeleteResult = {
let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const CRITICAL_CW_WATERMARK_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
10
) || 15
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const criticalCwWatermarkOverlapSeconds = Math.max(
5,
Number.parseInt(
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
10
) || 60
);
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
criticalCwWatermarkOverlapSeconds * 1000;
const criticalCwDeltaLimit = Math.max(
100,
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
5000
);
const lastCriticalFullSyncByStep = new Map<string, number>();
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const computeCriticalCwWatermarkDecision = async (
cwPrisma: CwPrismaClient,
step: Step,
forceIncremental: boolean
): Promise<SmartSyncDecision | null> => {
if (!forceIncremental) return null;
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
if (!cwDelegate) {
return null;
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
const lowerBound = lastWatermark
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
const rows = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceUpdatedField]: {
gte: lowerBound,
},
},
orderBy: { [step.sourceUpdatedField]: "asc" },
take: criticalCwDeltaLimit,
})) as Row[];
if (rows.length >= criticalCwDeltaLimit) {
console.warn(
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
);
return { mode: "full", differences: [] };
}
if (rows.length > 0) {
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
if (latest) {
lastCriticalCwWatermarkByStep.set(step.name, latest);
}
} else if (!lastWatermark) {
lastCriticalCwWatermarkByStep.set(step.name, new Date());
}
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
console.log(
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
);
return {
mode: "incremental",
sourceIds,
differences: [],
};
};
const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {};
@@ -107,6 +237,20 @@ const resolveApiDatabaseUrl = (): string => {
if (process.env.OPTIMA_API_DATABASE_URL)
return process.env.OPTIMA_API_DATABASE_URL;
// Worker/runtime fallback:
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
return process.env.DATABASE_URL;
}
if (
process.env.DATABASE_URL &&
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
) {
return process.env.DATABASE_URL;
}
const candidates = [
resolve(import.meta.dir, "../../api/.env"),
resolve(process.cwd(), "../api/.env"),
@@ -323,6 +467,22 @@ const refreshContextFromApi = async (
}
}
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid);
}
@@ -636,6 +796,13 @@ const sanitizeModelData = (
) {
sanitized.stageId = null;
}
// Nullify locationId if the corporate location doesn't exist
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
// Nullify taxCodeId if the tax code hasn't synced yet
if (
sanitized.taxCodeId != null &&
@@ -1328,6 +1495,15 @@ export const executeFullDalpuriSync = async (options?: {
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Users",
sourceModel: "member",
@@ -1342,15 +1518,6 @@ export const executeFullDalpuriSync = async (options?: {
},
},
},
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{
name: "Companies",
sourceModel: "company",
@@ -1585,6 +1752,11 @@ export const executeFullDalpuriSync = async (options?: {
secondarySalesFlag: true,
},
},
soOppStatus: {
select: {
closedFlag: true,
},
},
},
},
},
@@ -1729,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
step,
forceIncremental
);
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
cwPrisma,
step,
forceIncremental
);
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: criticalWatermarkDecision ?? decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter =
decision.mode === "incremental" ? decision.sourceIds : undefined;
effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
decision.mode
effectiveDecision.mode
}${
decision.mode === "incremental"
? ` (${decision.sourceIds.length} ids)`
effectiveDecision.mode === "incremental"
? ` (${effectiveDecision.sourceIds.length} ids)`
: ""
}`
);
if (logAllDifferences) {
logAllSmartSyncDifferences(step, decision.differences);
logAllSmartSyncDifferences(step, effectiveDecision.differences);
}
const result = await syncStep(
cwPrisma,
@@ -1763,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
await writeStepLog(
step.name,
decision.mode,
effectiveDecision.mode,
result,
{ deleted: 0, failed: 0 },
Date.now() - stepStart
+1 -1
View File
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
{
from: "lastName",
to: "lastName",
process: (value) => (value ? value : "Contact"),
process: (value) => (value ? value : ""),
},
{ from: "nickName", to: "nickname" },
{ from: "title", to: "title" },
+10 -1
View File
@@ -1,6 +1,7 @@
import {
Opportunity as CwOpportunity,
OpportunityMember as CwOpportunityMember,
SoOppStatus as CwSoOppStatus,
} from "../../generated/prisma/client";
import { OpportunityInterest } from "../../../api/generated/prisma/client";
import { Translation, skipRow } from "./types";
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
dateBecameLead?: Date | null;
closedDate?: Date | null;
closedFlag: boolean;
locationId?: number | null;
closedById?: string | null;
updatedBy: string;
eneteredBy: string;
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
CwOpportunityMember,
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
>[];
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
};
const toInterest = (value: number | null): OpportunityInterest | null => {
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
},
{ from: "companyRecId", to: "companyId" },
{ from: "contactRecId", to: "contactId" },
{ from: "ownerLevelRecId", to: "locationId" },
{ from: "companyAddressRecId", to: "siteId" },
{ from: "poNumber", to: "customerPO" },
{ from: "dateCloseExpected", to: "expectedCloseDate" },
{ from: "datePipelineChange", to: "pipelineChangeDate" },
{ from: "dateBecameLead", to: "dateBecameLead" },
{ from: "dateClosed", to: "closedDate" },
{ from: "oldCloseFlag", to: "closedFlag" },
{
from: "oldCloseFlag",
to: "closedFlag",
process: (_value, _context, row) =>
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
},
{ from: "closedBy", to: "closedById" },
{
from: "updatedBy",