Compare commits

...

31 Commits

Author SHA1 Message Date
HoloPanio a8c48e8c75 fix: correct prisma client import path in setup-admin 2026-04-14 03:18:58 +00:00
HoloPanio 051edb5f78 chore: add setup-admin script 2026-04-14 03:06:39 +00:00
HoloPanio f87f6dd336 chore: add setup-admin dockerfile stage 2026-04-14 02:52:51 +00:00
HoloPanio 2eb387811d fix(worker): break circular import by extracting PgBoss singleton
incremental-sync.ts and api/cw/sync.ts imported getBoss() from workert.ts.
When workert.ts (the entry point) dynamically imported incremental-sync.ts,
it triggered a circular module re-evaluation that hung indefinitely.

Extract the PgBoss singleton and getBoss() factory to a new boss-instance.ts
module that neither has top-level async side-effects nor imports from
workert.ts. All consumers (workert.ts, index.ts, incremental-sync.ts,
cw/sync.ts) now import from boss-instance.ts instead.
2026-04-14 00:34:33 +00:00
HoloPanio db27c9224d fix(worker): add granular debug logging to isolate startup hang
Add console.log before/after each createQueue() call and dynamic
import to pinpoint exactly where the worker startup is blocking.
2026-04-14 00:12:20 +00:00
HoloPanio 7f6e6fdfbc fix(worker): add PgBoss startup timeouts and debug logging
- Add statement_timeout=30000ms to PgBoss connection URL to prevent
  SQL queries from hanging indefinitely
- Add connectionTimeoutMillis=15s to PgBoss config for connection timeout
- Wrap boss.start() in 30s Promise.race timeout with process.exit(1)
  on failure to ensure container restarts instead of hanging silently
- Add debug logging around PgBoss startup to diagnose connection issues
2026-04-13 23:53:32 +00:00
HoloPanio 5f5f610060 fix: remove prisma/config import; use plain export in prisma.config.ts 2026-04-13 21:35:34 +00:00
HoloPanio 809841d672 fix: add url = env(DATABASE_URL) to prisma schema datasource 2026-04-13 21:31:43 +00:00
HoloPanio 276eb563bf fix: remove prisma.config.ts from runtime image (use defaults) 2026-04-13 21:26:40 +00:00
HoloPanio 7624ba0bc0 fix: add bunx symlink to runtime Docker image 2026-04-13 21:18:01 +00:00
HoloPanio 1063231107 chore: update bun.lock (@types/bun 1.3.11 -> 1.3.12) 2026-04-13 21:11:25 +00:00
Jackson 2cd5dee612 Merge pull request #3 from HorizonStackSoftware/copilot/remove-prisma-script
Replace migrate-entrypoint.sh with direct Prisma commands in Dockerfile
2026-04-12 10:37:33 -05:00
copilot-swe-agent[bot] 8ac1cbaf3e chore: replace migrate-entrypoint.sh with direct prisma commands in Dockerfile
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/eb8e2182-3a0d-4a9c-ad4f-4d1d9cf8a923

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 15:34:53 +00:00
Jackson bd7e6a37cd Merge pull request #2 from HorizonStackSoftware/copilot/remove-frozen-lockfile-params
Remove --frozen-lockfile from test workflows
2026-04-12 09:44:53 -05:00
copilot-swe-agent[bot] 4e0799f9d9 Remove --frozen-lockfile from test workflow files
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/8b3e4db9-a1bf-44c4-98fc-3304890cb3f4

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 14:41:14 +00:00
Jackson 223a06ba27 Merge pull request #1 from HorizonStackSoftware/copilot/add-post-build-command-for-migration
Run Prisma migrations automatically on API container startup
2026-04-12 09:38:45 -05:00
copilot-swe-agent[bot] 503657d168 feat: run prisma migrate deploy on api container startup
Agent-Logs-Url: https://github.com/HorizonStackSoftware/optima/sessions/509d6156-c474-457b-9627-82f7b2f13158

Co-authored-by: HoloPanio <30759238+HoloPanio@users.noreply.github.com>
2026-04-12 14:34:58 +00:00
Jackson cf68e281e8 Update bun install commands in Dockerfile
Removed the --frozen-lockfile option from bun install commands in the Dockerfile.
2026-04-12 09:21:27 -05:00
HoloPanio 57b5763d41 fix(opportunity): remove synthetic Contact suffix in contact field 2026-04-10 05:09:40 +00:00
HoloPanio 2bd498a35d fix(sync): use CW watermark incremental path for critical tables 2026-04-10 04:53:57 +00:00
HoloPanio 86d7426e8b fix(sync): harden incremental observability and periodic reconciliation 2026-04-10 04:36:36 +00:00
HoloPanio afe56393e7 fix(sync): restore worker incremental API DB resolution 2026-04-10 04:07:27 +00:00
HoloPanio b2cd26af30 fix(release): unblock deploy workflow image build and desktop rebuild 2026-04-10 03:44:33 +00:00
HoloPanio 0594816ea4 fix(api): include pdfmake Roboto fonts in runtime image 2026-04-10 03:00:31 +00:00
HoloPanio 71fe36c0b8 fix(worker): restore reliable 5s incremental sync cadence 2026-04-10 01:00:04 +00:00
HoloPanio e0d575454e fix(dalpuri): sync CW Members before Users to resolve FK ordering issue
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing
Users first caused all 140 User upserts to fail since the CwMember table was
empty. This cascade failure then caused all Opportunity upserts to fail because
Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier.

Fix: reorder steps so CW Members syncs first, then Users.
2026-04-09 01:04:00 +00:00
HoloPanio 32bba31e72 fix(dalpuri): populate locationId and fix closedFlag on opportunities
- Add ownerLevelRecId -> locationId mapping to opportunity translation
- Include soOppStatus in opportunity query and derive closedFlag from
  status.closedFlag (with fallback to legacy oldCloseFlag field)
- Add locationId sanitization guard in both sync.ts and sync-by-table.ts

Note: departmentId is not available in CW SO_Opportunity table and
remains null for synced records.
2026-04-09 00:22:41 +00:00
HoloPanio 1233535b20 fix(dalpuri): populate userIdentifiersByMemberRecId from CwMember table
When no User accounts have cwMemberId linked, the context map was empty
and all opportunities got primarySalesRepId = null. Now also populate
the map from CwMember rows directly (User-linked entries take precedence),
so rep identifiers resolve correctly regardless of user account linkage.
2026-04-08 23:23:51 +00:00
HoloPanio 2c737b22f1 fix(dalpuri): exit(0) after sync completes to release k8s job
Prisma MSSQL adapter keeps connections open after the sync finishes,
preventing the process from exiting naturally. The k8s job was staying
in Running state indefinitely. Call process.exit(0) on success so the
job completes and the GH workflow step passes.
2026-04-08 21:50:52 +00:00
HoloPanio a3bfe9f374 fix(ci): increase dalpuri sync timeout from 30min to 2h
Full initial sync has 500k+ rows across all tables and exceeded the
30-minute activeDeadlineSeconds. Bump both the k8s job deadline and
the kubectl wait timeout to 7200s (2 hours).
2026-04-08 21:19:43 +00:00
HoloPanio a106bb15a8 fix(ci): explicit env vars in dalpuri sync job; add CW_DATABASE_URL to secret
envFrom was loading api-env-secret but CW_DATABASE_URL was absent from the
deployed secret, causing sync.ts to fall back to DATABASE_URL (Postgres) as
the MSSQL connection string -> 'Invalid port number: //optima'.

- Replaced envFrom with explicit CW_DATABASE_URL and API_DATABASE_URL env
  entries so the mapping is unambiguous
- Patched api-env-secret in cluster to add CW_DATABASE_URL
2026-04-08 20:41:49 +00:00
24 changed files with 579 additions and 81 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ jobs:
bun-version: "1.3.6" bun-version: "1.3.6"
- name: Install dependencies - name: Install dependencies
run: bun install --frozen-lockfile run: bun install
- name: Generate API Prisma client - name: Generate API Prisma client
run: DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate run: DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
+1 -1
View File
@@ -18,7 +18,7 @@ jobs:
bun-version: "1.3.6" bun-version: "1.3.6"
- name: Install dependencies - name: Install dependencies
run: bun install --frozen-lockfile run: bun install
- name: Generate Dalpuri Prisma client (CW MSSQL) - name: Generate Dalpuri Prisma client (CW MSSQL)
run: DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" bunx prisma generate run: DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" bunx prisma generate
+6 -4
View File
@@ -231,9 +231,10 @@ jobs:
run: bun install --frozen-lockfile run: bun install --frozen-lockfile
- name: Rebuild native modules - name: Rebuild native modules
run: npm rebuild run: npm rebuild --ignore-scripts
env: env:
HUSKY: "0" HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build macOS distributables - name: Build macOS distributables
run: bun run make:macos run: bun run make:macos
@@ -272,9 +273,10 @@ jobs:
run: bun install --frozen-lockfile run: bun install --frozen-lockfile
- name: Rebuild native modules - name: Rebuild native modules
run: npm rebuild run: npm rebuild --ignore-scripts
env: env:
HUSKY: "0" HUSKY: "0"
HUSKY_SKIP_INSTALL: "1"
- name: Build Windows distributables - name: Build Windows distributables
run: bun run make -- --platform win32 run: bun run make -- --platform win32
@@ -318,9 +320,9 @@ jobs:
TAG=${{ github.event.release.tag_name }} TAG=${{ github.event.release.tag_name }}
JOB="job/dalpuri-sync-${TAG}" JOB="job/dalpuri-sync-${TAG}"
kubectl wait --for=condition=complete --timeout=1800s -n optima "$JOB" & kubectl wait --for=condition=complete --timeout=7200s -n optima "$JOB" &
WAIT_COMPLETE=$! WAIT_COMPLETE=$!
kubectl wait --for=condition=failed --timeout=1800s -n optima "$JOB" & kubectl wait --for=condition=failed --timeout=7200s -n optima "$JOB" &
WAIT_FAILED=$! WAIT_FAILED=$!
wait -n $WAIT_COMPLETE $WAIT_FAILED wait -n $WAIT_COMPLETE $WAIT_FAILED
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
bun-version: "1.3.11" bun-version: "1.3.11"
- name: Install dependencies - name: Install dependencies
run: bun install --frozen-lockfile run: bun install
- name: Run unit tests - name: Run unit tests
run: bun run test:unit -- --run run: bun run test:unit -- --run
+34 -8
View File
@@ -17,7 +17,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json COPY ui/package.json ./ui/package.json
COPY patches ./patches COPY patches ./patches
RUN bun install --frozen-lockfile --production RUN bun install --production
# ---- Stage 2: Build ---- # ---- Stage 2: Build ----
FROM oven/bun:1.3.11 AS build FROM oven/bun:1.3.11 AS build
@@ -32,7 +32,7 @@ COPY ui/package.json ./ui/package.json
COPY patches ./patches COPY patches ./patches
# Install all deps (including dev) for the full workspace # Install all deps (including dev) for the full workspace
RUN bun install --frozen-lockfile RUN bun install
# Copy API source and config # Copy API source and config
COPY api/src/ ./api/src/ COPY api/src/ ./api/src/
@@ -90,6 +90,13 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
# Copy production node_modules (Prisma adapter needs native bindings) # Copy production node_modules (Prisma adapter needs native bindings)
COPY --from=deps /app/node_modules/ ./node_modules/ COPY --from=deps /app/node_modules/ ./node_modules/
# Copy bun so prisma migrate deploy can run at container startup
COPY --from=build /usr/local/bin/bun /usr/local/bin/bun
RUN ln -s /usr/local/bin/bun /usr/local/bin/bunx
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
ENV NODE_ENV=production ENV NODE_ENV=production
# ---- Stage 4: API server runtime image ---- # ---- Stage 4: API server runtime image ----
@@ -101,7 +108,7 @@ COPY --from=build /app/api/logo.png ./logo.png
COPY --from=build /app/api/src/modules/sales-utils/salesTaxRates.json ./salesTaxRates.json COPY --from=build /app/api/src/modules/sales-utils/salesTaxRates.json ./salesTaxRates.json
EXPOSE 3000 EXPOSE 3000
CMD ["./server"] CMD ["sh", "-c", "bunx prisma migrate deploy && ./server"]
# ---- Stage 5: Worker runtime image ---- # ---- Stage 5: Worker runtime image ----
FROM runtime-base AS worker FROM runtime-base AS worker
@@ -125,15 +132,13 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json COPY ui/package.json ./ui/package.json
COPY patches ./patches COPY patches ./patches
RUN bun install --frozen-lockfile RUN bun install
COPY api/prisma/ ./api/prisma/ COPY api/prisma/ ./api/prisma/
COPY api/prisma.config.ts ./api/prisma.config.ts COPY api/prisma.config.ts ./api/prisma.config.ts
RUN chmod +x /app/api/prisma/migrate-entrypoint.sh
WORKDIR /app/api WORKDIR /app/api
CMD ["sh", "prisma/migrate-entrypoint.sh"] CMD ["bunx", "prisma", "migrate", "deploy"]
# ---- Stage 7: Dalpuri CW-to-API sync runner ---- # ---- Stage 7: Dalpuri CW-to-API sync runner ----
FROM oven/bun:1.3.11 AS dalpuri-sync FROM oven/bun:1.3.11 AS dalpuri-sync
@@ -146,7 +151,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json COPY ui/package.json ./ui/package.json
COPY patches ./patches COPY patches ./patches
RUN bun install --frozen-lockfile RUN bun install
COPY dalpuri/src/ ./dalpuri/src/ COPY dalpuri/src/ ./dalpuri/src/
COPY dalpuri/prisma/ ./dalpuri/prisma/ COPY dalpuri/prisma/ ./dalpuri/prisma/
@@ -164,3 +169,24 @@ RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma gen
WORKDIR /app/dalpuri WORKDIR /app/dalpuri
CMD ["bun", "run", "src/sync.ts"] CMD ["bun", "run", "src/sync.ts"]
FROM oven/bun:1.3.11 AS setup-admin
WORKDIR /app
COPY package.json bun.lock ./
COPY api/package.json ./api/package.json
COPY dalpuri/package.json ./dalpuri/package.json
COPY ui/package.json ./ui/package.json
COPY patches ./patches
RUN bun install
COPY api/prisma/ ./api/prisma/
COPY api/prisma.config.ts ./api/prisma.config.ts
COPY api/setup-admin.ts ./api/setup-admin.ts
WORKDIR /app/api
RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
CMD ["bun", "run", "setup-admin.ts"]
+5
View File
@@ -20,6 +20,11 @@ spec:
env: env:
- name: MANAGER_SOCKET_URL - name: MANAGER_SOCKET_URL
value: "http://optima-api.optima.svc.cluster.local:8671" value: "http://optima-api.optima.svc.cluster.local:8671"
- name: API_DATABASE_URL
valueFrom:
secretKeyRef:
name: api-env-secret
key: DATABASE_URL
envFrom: envFrom:
- secretRef: - secretRef:
name: api-env-secret name: api-env-secret
+3 -5
View File
@@ -1,11 +1,9 @@
import { defineConfig, env } from 'prisma/config' export default {
export default defineConfig({
schema: 'prisma/schema.prisma', schema: 'prisma/schema.prisma',
migrations: { migrations: {
path: 'prisma/migrations', path: 'prisma/migrations',
}, },
datasource: { datasource: {
url: env('DATABASE_URL'), url: process.env.DATABASE_URL,
}, },
}) }
+66
View File
@@ -0,0 +1,66 @@
import { PrismaClient } from './generated/prisma/client';
const prisma = new PrismaClient();
async function main() {
try {
// Create the admin role if it doesn't exist
const adminRole = await prisma.role.upsert({
where: { moniker: 'admin' },
update: {},
create: {
title: 'Administrator',
moniker: 'admin',
permissions: JSON.stringify({
// Full permissions for admin
'*': true,
}),
},
});
console.log('✓ Admin role created/verified:', adminRole);
// Find the user with jackson.roberts@totaltech.net
const user = await prisma.user.findUnique({
where: { email: 'jackson.roberts@totaltech.net' },
include: { roles: true },
});
if (!user) {
console.error(
'✗ User jackson.roberts@totaltech.net not found. Please ensure the user exists.',
);
process.exit(1);
}
console.log('✓ User found:', user.email);
// Check if user already has admin role
const hasAdminRole = user.roles.some((r) => r.moniker === 'admin');
if (hasAdminRole) {
console.log('✓ User already has admin role');
} else {
// Assign admin role to user
const updatedUser = await prisma.user.update({
where: { id: user.id },
data: {
roles: {
connect: { id: adminRole.id },
},
},
include: { roles: true },
});
console.log('✓ Admin role assigned to jackson.roberts@totaltech.net');
console.log('✓ User roles:', updatedUser.roles.map((r) => r.moniker));
}
} catch (error) {
console.error('✗ Error:', error);
process.exit(1);
} finally {
await prisma.$disconnect();
}
}
main();
+1 -1
View File
@@ -2,7 +2,7 @@ import { createRoute } from "../../modules/api-utils/createRoute";
import { apiResponse } from "../../modules/api-utils/apiResponse"; import { apiResponse } from "../../modules/api-utils/apiResponse";
import { ContentfulStatusCode } from "hono/utils/http-status"; import { ContentfulStatusCode } from "hono/utils/http-status";
import { authMiddleware } from "../middleware/authorization"; import { authMiddleware } from "../middleware/authorization";
import { getBoss } from "../../workert"; import { getBoss } from "../../boss-instance";
import { WorkerQueue } from "../../modules/workers/queues"; import { WorkerQueue } from "../../modules/workers/queues";
/* POST /v1/cw/sync/full */ /* POST /v1/cw/sync/full */
+30
View File
@@ -0,0 +1,30 @@
/**
* Shared PgBoss singleton — kept in its own module to break circular imports
* between workert.ts and the worker modules that call getBoss().
*/
import { PgBoss } from "pg-boss";
function makePgBossUrl(rawUrl: string): string {
try {
const u = new URL(rawUrl);
// 30-second statement timeout to prevent individual SQL queries from
// hanging indefinitely if the DB server stops responding mid-query.
u.searchParams.set("options", "-c statement_timeout=30000");
return u.toString();
} catch {
return rawUrl;
}
}
export const boss = new PgBoss({
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
connectionTimeoutMillis: 15_000,
});
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
});
export function getBoss(): PgBoss {
return boss;
}
+16 -2
View File
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
return null; return null;
} }
function formatOpportunityContactName(
firstName?: string | null,
lastName?: string | null
): string {
const first = (firstName ?? "").trim();
const last = (lastName ?? "").trim();
if (first && last.toLowerCase() === "contact") {
return first;
}
return `${first} ${last}`.trim();
}
/** /**
* Opportunity Controller * Opportunity Controller
* *
@@ -290,7 +304,7 @@ export class OpportunityController {
| null | null
| undefined; | undefined;
this.contactName = (data as any).contactName ?? (contactRel this.contactName = (data as any).contactName ?? (contactRel
? `${contactRel.firstName} ${contactRel.lastName}`.trim() ? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
: null); : null);
// Site // Site
@@ -674,7 +688,7 @@ export class OpportunityController {
id: contact.id, id: contact.id,
contact: { contact: {
id: contact.id, id: contact.id,
name: `${contact.firstName} ${contact.lastName}`.trim(), name: formatOpportunityContactName(contact.firstName, contact.lastName),
}, },
company: contact.company company: contact.company
? { ? {
+2 -1
View File
@@ -6,7 +6,8 @@ import { events } from "./modules/globalEvents";
import { setupEventDebugger } from "./modules/logging/eventDebugger"; import { setupEventDebugger } from "./modules/logging/eventDebugger";
import { signPermissions } from "./modules/permission-utils/signPermissions"; import { signPermissions } from "./modules/permission-utils/signPermissions";
import { RoleController } from "./controllers/RoleController"; import { RoleController } from "./controllers/RoleController";
import { initializeWorkerSystem, getBoss } from "./workert"; import { initializeWorkerSystem } from "./workert";
import { getBoss } from "./boss-instance";
import { WorkerQueue } from "./modules/workers/queues"; import { WorkerQueue } from "./modules/workers/queues";
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync"; import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
import { startCommsServer } from "./modules/workers/coms"; import { startCommsServer } from "./modules/workers/coms";
+21 -2
View File
@@ -1,5 +1,5 @@
import PdfPrinter from "pdfmake/src/Printer"; import PdfPrinter from "pdfmake/src/Printer";
import { readFileSync } from "node:fs"; import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
export interface QuoteLineItem { export interface QuoteLineItem {
@@ -110,7 +110,26 @@ const COMPANY = {
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png"); const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"); function resolveRobotoFontDir(): string {
const candidates = [
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
];
for (const dir of candidates) {
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
return dir;
}
}
throw new Error(
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
);
}
const fontDir = resolveRobotoFontDir();
const fonts = { const fonts = {
Roboto: { Roboto: {
normal: join(fontDir, "Roboto-Regular.ttf"), normal: join(fontDir, "Roboto-Regular.ttf"),
+1
View File
@@ -1,6 +1,7 @@
import { Server } from "socket.io"; import { Server } from "socket.io";
import { events, EventTypes } from "../globalEvents"; import { events, EventTypes } from "../globalEvents";
import { WorkerQueue } from "./queues"; import { WorkerQueue } from "./queues";
import { reserveWorkerId } from "../../workert";
function emitGlobalEvent<K extends keyof EventTypes>( function emitGlobalEvent<K extends keyof EventTypes>(
name: K, name: K,
+50 -1
View File
@@ -1,5 +1,6 @@
import { Socket } from "socket.io-client"; import { Socket } from "socket.io-client";
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri"; import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
import { prisma } from "../../constants";
/** /**
* Execute a full sync from Dalpuri (ConnectWise) to the API database. * Execute a full sync from Dalpuri (ConnectWise) to the API database.
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
* Called every 5 seconds via PgBoss from the API process interval. * Called every 5 seconds via PgBoss from the API process interval.
*/ */
export async function executeIncrementalSync(): Promise<void> { export async function executeIncrementalSync(): Promise<void> {
return executeForcedIncrementalDalpuriSync(); let jobRunId: string | undefined;
try {
const run = await prisma.syncJobRun.create({
data: {
jobType: "INCREMENTAL_SYNC",
status: "RUNNING",
triggeredBy: "worker",
startedAt: new Date(),
},
select: { id: true },
});
jobRunId = run.id;
} catch (err) {
// Sync should still run even if tracking insert fails.
console.error("[sync] Failed to create incremental SyncJobRun", err);
}
try {
await executeForcedIncrementalDalpuriSync({ jobRunId });
if (jobRunId) {
await prisma.syncJobRun.update({
where: { id: jobRunId },
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
}
} catch (err) {
if (jobRunId) {
const errorSummary = err instanceof Error ? err.message : String(err);
await prisma.syncJobRun
.update({
where: { id: jobRunId },
data: {
status: "FAILED",
completedAt: new Date(),
errorSummary: errorSummary.slice(0, 2000),
},
})
.catch(() => {
// Best-effort update only.
});
}
throw err;
}
} }
+14 -2
View File
@@ -1,4 +1,4 @@
import { getBoss } from "../../workert"; import { getBoss } from "../../boss-instance";
import { WorkerQueue } from "./queues"; import { WorkerQueue } from "./queues";
/** /**
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
* Called on an interval from the main API process so it survives worker restarts. * Called on an interval from the main API process so it survives worker restarts.
*/ */
export async function enqueueIncrementalSync(): Promise<void> { export async function enqueueIncrementalSync(): Promise<void> {
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {}); const jobId = await getBoss().send(
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
{
enqueuedAt: new Date().toISOString(),
},
{
singletonKey: "dalpuri-incremental-sync",
}
);
if (!jobId) {
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
}
} }
+3 -1
View File
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
queueType: WorkerQueue, queueType: WorkerQueue,
workFn: (workerSocket: Socket) => Promise<T>, workFn: (workerSocket: Socket) => Promise<T>,
): Promise<T> { ): Promise<T> {
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Request a worker ID and namespace from the manager // Request a worker ID and namespace from the manager
socket.emit( socket.emit(
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
} }
// Connect to the worker-specific namespace // Connect to the worker-specific namespace
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, { const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
reconnection: false, reconnection: false,
}); });
+61 -23
View File
@@ -1,13 +1,7 @@
import { PgBoss } from "pg-boss";
import { io, Socket } from "socket.io-client"; import { io, Socket } from "socket.io-client";
import { WorkerQueue } from "./modules/workers/queues"; import { WorkerQueue } from "./modules/workers/queues";
import { setupEventDebugger } from "./modules/logging/eventDebugger"; import { setupEventDebugger } from "./modules/logging/eventDebugger";
import { boss, getBoss } from "./boss-instance";
const boss = new PgBoss(process.env.DATABASE_URL!);
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
});
let bossStartPromise: Promise<void> | null = null; let bossStartPromise: Promise<void> | null = null;
let reservationQueueReady = false; let reservationQueueReady = false;
@@ -111,19 +105,25 @@ export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
async function ensureDalpuriSyncQueue(): Promise<void> { async function ensureDalpuriSyncQueue(): Promise<void> {
try { try {
console.log("[worker] Creating DALPURI_FULL_SYNC queue...");
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC); await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
} catch { console.log("[worker] DALPURI_FULL_SYNC queue ready");
// Queue may already exist; ignore to keep this idempotent. } catch (err) {
console.log("[worker] DALPURI_FULL_SYNC queue already exists (or error):", (err as Error).message);
} }
try { try {
console.log("[worker] Creating DALPURI_INCREMENTAL_SYNC queue...");
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC); await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
} catch { console.log("[worker] DALPURI_INCREMENTAL_SYNC queue ready");
// Queue may already exist; ignore to keep this idempotent. } catch (err) {
console.log("[worker] DALPURI_INCREMENTAL_SYNC queue already exists (or error):", (err as Error).message);
} }
try { try {
console.log("[worker] Creating REFRESH_SALES_METRICS queue...");
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS); await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
} catch { console.log("[worker] REFRESH_SALES_METRICS queue ready");
// Queue may already exist; ignore to keep this idempotent. } catch (err) {
console.log("[worker] REFRESH_SALES_METRICS queue already exists (or error):", (err as Error).message);
} }
} }
@@ -138,14 +138,6 @@ export async function initializeWorkerSystem(): Promise<void> {
console.log("[worker] Worker system initialized - ready for job enqueueing"); console.log("[worker] Worker system initialized - ready for job enqueueing");
} }
/**
* Get the PgBoss instance for direct job enqueueing.
* Must call initializeWorkerSystem() first.
*/
export function getBoss(): PgBoss {
return boss;
}
if (import.meta.main) { if (import.meta.main) {
// if (Bun.env.NODE_ENV === "development") { // if (Bun.env.NODE_ENV === "development") {
// setupEventDebugger({ processLabel: "WORKER" }); // setupEventDebugger({ processLabel: "WORKER" });
@@ -155,14 +147,33 @@ if (import.meta.main) {
console.log( console.log(
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}` `[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
); );
console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`);
// Ensure PgBoss is connected and queues exist // Ensure PgBoss is connected and queues exist
await ensureBossStarted(); console.log("[worker] Starting PgBoss...");
try {
await Promise.race([
ensureBossStarted(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000)
),
]);
} catch (err) {
console.error("[worker] FATAL: PgBoss failed to start:", err);
process.exit(1);
}
console.log("[worker] PgBoss started successfully");
console.log("[worker] Ensuring sync queues...");
await ensureDalpuriSyncQueue(); await ensureDalpuriSyncQueue();
console.log("[worker] Sync queues ready");
// Register job handler for DALPURI_FULL_SYNC // Register job handler for DALPURI_FULL_SYNC
console.log("[worker] Importing sync-manager...");
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
console.log("[worker] Importing dalpuri-sync...");
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
console.log("[worker] Importing incremental-sync...");
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
const socket = await ensureManagerSocketReady(); const socket = await ensureManagerSocketReady();
await enqueueDalpuriFullSync(socket); await enqueueDalpuriFullSync(socket);
@@ -170,10 +181,37 @@ if (import.meta.main) {
console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
await executeIncrementalSync(); const startedAt = Date.now();
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
try {
await executeIncrementalSync();
console.log(
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
);
} catch (err) {
console.error(
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
err
);
throw err;
}
}); });
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
const enqueueIncrementalWithLogging = () => {
enqueueIncrementalSync().catch((err) => {
console.error(
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
);
});
};
// Keep a worker-local 5s scheduler so incremental sync continues even when
// API interval scheduling is unavailable.
enqueueIncrementalWithLogging();
setInterval(enqueueIncrementalWithLogging, 5_000);
console.log("[worker] Started 5-second incremental enqueue interval");
// Register job handler for REFRESH_SALES_METRICS // Register job handler for REFRESH_SALES_METRICS
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => { await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
+2 -2
View File
@@ -621,7 +621,7 @@
"@types/aria-query": ["@types/aria-query@5.0.4", "", {}, "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw=="], "@types/aria-query": ["@types/aria-query@5.0.4", "", {}, "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw=="],
"@types/bun": ["@types/bun@1.3.11", "", { "dependencies": { "bun-types": "1.3.11" } }, "sha512-5vPne5QvtpjGpsGYXiFyycfpDF2ECyPcTSsFBMa0fraoxiQyMJ3SmuQIGhzPg2WJuWxVBoxWJ2kClYTcw/4fAg=="], "@types/bun": ["@types/bun@1.3.12", "", { "dependencies": { "bun-types": "1.3.12" } }, "sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A=="],
"@types/cacheable-request": ["@types/cacheable-request@6.0.3", "", { "dependencies": { "@types/http-cache-semantics": "*", "@types/keyv": "^3.1.4", "@types/node": "*", "@types/responselike": "^1.0.0" } }, "sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw=="], "@types/cacheable-request": ["@types/cacheable-request@6.0.3", "", { "dependencies": { "@types/http-cache-semantics": "*", "@types/keyv": "^3.1.4", "@types/node": "*", "@types/responselike": "^1.0.0" } }, "sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw=="],
@@ -841,7 +841,7 @@
"buffer-from": ["buffer-from@1.1.2", "", {}, "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="], "buffer-from": ["buffer-from@1.1.2", "", {}, "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="],
"bun-types": ["bun-types@1.3.11", "", { "dependencies": { "@types/node": "*" } }, "sha512-1KGPpoxQWl9f6wcZh57LvrPIInQMn2TQ7jsgxqpRzg+l0QPOFvJVH7HmvHo/AiPgwXy+/Thf6Ov3EdVn1vOabg=="], "bun-types": ["bun-types@1.3.12", "", { "dependencies": { "@types/node": "*" } }, "sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA=="],
"bundle-name": ["bundle-name@4.1.0", "", { "dependencies": { "run-applescript": "^7.0.0" } }, "sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q=="], "bundle-name": ["bundle-name@4.1.0", "", { "dependencies": { "run-applescript": "^7.0.0" } }, "sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q=="],
+6 -4
View File
@@ -8,7 +8,7 @@ metadata:
spec: spec:
backoffLimit: 0 backoffLimit: 0
ttlSecondsAfterFinished: 86400 ttlSecondsAfterFinished: 86400
activeDeadlineSeconds: 1800 activeDeadlineSeconds: 7200
template: template:
metadata: metadata:
labels: labels:
@@ -17,10 +17,12 @@ spec:
containers: containers:
- name: sync - name: sync
image: ghcr.io/horizonstacksoftware/optima-dalpuri-sync:RELEASE_TAG image: ghcr.io/horizonstacksoftware/optima-dalpuri-sync:RELEASE_TAG
envFrom:
- secretRef:
name: api-env-secret
env: env:
- name: CW_DATABASE_URL
valueFrom:
secretKeyRef:
name: api-env-secret
key: CW_DATABASE_URL
- name: API_DATABASE_URL - name: API_DATABASE_URL
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:
+27
View File
@@ -294,6 +294,22 @@ const refreshContextFromApi = async (
} }
} }
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) { for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid); context.serviceTicketBoardUidsById.set(board.id, board.uid);
} }
@@ -426,6 +442,12 @@ const sanitizeModelData = (
) { ) {
sanitized.statusId = null; sanitized.statusId = null;
} }
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
} }
if (targetModel === "schedule") { if (targetModel === "schedule") {
@@ -734,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
+216 -19
View File
@@ -75,6 +75,136 @@ type DeleteResult = {
let incrementalDeleteStepIndex = 0; let incrementalDeleteStepIndex = 0;
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const CRITICAL_CW_WATERMARK_TABLES = new Set([
"Companies",
"Company Addresses",
"Contacts",
"Opportunities",
]);
const criticalFullSyncIntervalMinutes = Math.max(
1,
Number.parseInt(
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
10
) || 15
);
const CRITICAL_FULL_SYNC_INTERVAL_MS =
criticalFullSyncIntervalMinutes * 60 * 1000;
const criticalCwWatermarkOverlapSeconds = Math.max(
5,
Number.parseInt(
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
10
) || 60
);
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
criticalCwWatermarkOverlapSeconds * 1000;
const criticalCwDeltaLimit = Math.max(
100,
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
5000
);
const lastCriticalFullSyncByStep = new Map<string, number>();
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
const shouldForceCriticalFullSync = (
step: Step,
forceIncremental: boolean
): boolean => {
if (!forceIncremental) return false;
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
const now = Date.now();
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
return false;
}
lastCriticalFullSyncByStep.set(step.name, now);
return true;
};
const computeCriticalCwWatermarkDecision = async (
cwPrisma: CwPrismaClient,
step: Step,
forceIncremental: boolean
): Promise<SmartSyncDecision | null> => {
if (!forceIncremental) return null;
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
const cwDelegate = (
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
)[step.sourceModel];
if (!cwDelegate) {
return null;
}
const existingWhere =
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
const lowerBound = lastWatermark
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
const rows = (await cwDelegate.findMany({
select: {
[step.sourceIdField]: true,
[step.sourceUpdatedField]: true,
},
where: {
...(existingWhere as Record<string, unknown>),
[step.sourceUpdatedField]: {
gte: lowerBound,
},
},
orderBy: { [step.sourceUpdatedField]: "asc" },
take: criticalCwDeltaLimit,
})) as Row[];
if (rows.length >= criticalCwDeltaLimit) {
console.warn(
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
);
return { mode: "full", differences: [] };
}
if (rows.length > 0) {
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
if (latest) {
lastCriticalCwWatermarkByStep.set(step.name, latest);
}
} else if (!lastWatermark) {
lastCriticalCwWatermarkByStep.set(step.name, new Date());
}
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
console.log(
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
);
return {
mode: "incremental",
sourceIds,
differences: [],
};
};
const parseEnvFile = (path: string): Record<string, string> => { const parseEnvFile = (path: string): Record<string, string> => {
const envData = readFileSync(path, "utf8"); const envData = readFileSync(path, "utf8");
const out: Record<string, string> = {}; const out: Record<string, string> = {};
@@ -107,6 +237,20 @@ const resolveApiDatabaseUrl = (): string => {
if (process.env.OPTIMA_API_DATABASE_URL) if (process.env.OPTIMA_API_DATABASE_URL)
return process.env.OPTIMA_API_DATABASE_URL; return process.env.OPTIMA_API_DATABASE_URL;
// Worker/runtime fallback:
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
return process.env.DATABASE_URL;
}
if (
process.env.DATABASE_URL &&
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
) {
return process.env.DATABASE_URL;
}
const candidates = [ const candidates = [
resolve(import.meta.dir, "../../api/.env"), resolve(import.meta.dir, "../../api/.env"),
resolve(process.cwd(), "../api/.env"), resolve(process.cwd(), "../api/.env"),
@@ -323,6 +467,22 @@ const refreshContextFromApi = async (
} }
} }
const cwMembers = await apiPrisma.cwMember.findMany({
select: { cwMemberId: true, identifier: true },
});
for (const member of cwMembers) {
if (
member.cwMemberId != null &&
member.identifier &&
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
) {
context.userIdentifiersByMemberRecId.set(
member.cwMemberId,
member.identifier
);
}
}
for (const board of boards) { for (const board of boards) {
context.serviceTicketBoardUidsById.set(board.id, board.uid); context.serviceTicketBoardUidsById.set(board.id, board.uid);
} }
@@ -636,6 +796,13 @@ const sanitizeModelData = (
) { ) {
sanitized.stageId = null; sanitized.stageId = null;
} }
// Nullify locationId if the corporate location doesn't exist
if (
sanitized.locationId != null &&
!context.corporateLocationIds.has(sanitized.locationId as number)
) {
sanitized.locationId = null;
}
// Nullify taxCodeId if the tax code hasn't synced yet // Nullify taxCodeId if the tax code hasn't synced yet
if ( if (
sanitized.taxCodeId != null && sanitized.taxCodeId != null &&
@@ -1328,6 +1495,15 @@ export const executeFullDalpuriSync = async (options?: {
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs; const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [ const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Users", name: "Users",
sourceModel: "member", sourceModel: "member",
@@ -1342,15 +1518,6 @@ export const executeFullDalpuriSync = async (options?: {
}, },
}, },
}, },
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Companies", name: "Companies",
sourceModel: "company", sourceModel: "company",
@@ -1585,6 +1752,11 @@ export const executeFullDalpuriSync = async (options?: {
secondarySalesFlag: true, secondarySalesFlag: true,
}, },
}, },
soOppStatus: {
select: {
closedFlag: true,
},
},
}, },
}, },
}, },
@@ -1729,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
step, step,
forceIncremental forceIncremental
); );
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
cwPrisma,
step,
forceIncremental
);
const forceCriticalFullSync = shouldForceCriticalFullSync(
step,
forceIncremental
);
const effectiveDecision = forceCriticalFullSync
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
: criticalWatermarkDecision ?? decision;
if (forceCriticalFullSync) {
console.log(
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
);
}
const sourceIdsFilter = const sourceIdsFilter =
decision.mode === "incremental" ? decision.sourceIds : undefined; effectiveDecision.mode === "incremental"
? effectiveDecision.sourceIds
: undefined;
console.log( console.log(
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${ ` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
decision.mode effectiveDecision.mode
}${ }${
decision.mode === "incremental" effectiveDecision.mode === "incremental"
? ` (${decision.sourceIds.length} ids)` ? ` (${effectiveDecision.sourceIds.length} ids)`
: "" : ""
}` }`
); );
if (logAllDifferences) { if (logAllDifferences) {
logAllSmartSyncDifferences(step, decision.differences); logAllSmartSyncDifferences(step, effectiveDecision.differences);
} }
const result = await syncStep( const result = await syncStep(
cwPrisma, cwPrisma,
@@ -1763,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
await writeStepLog( await writeStepLog(
step.name, step.name,
decision.mode, effectiveDecision.mode,
result, result,
{ deleted: 0, failed: 0 }, { deleted: 0, failed: 0 },
Date.now() - stepStart Date.now() - stepStart
@@ -1860,8 +2053,12 @@ export const executeForcedIncrementalDalpuriSync = async (options?: {
}; };
if (import.meta.main) { if (import.meta.main) {
executeFullDalpuriSync().catch((error) => { executeFullDalpuriSync()
console.error("CW -> API sync failed:", error); .then(() => {
process.exit(1); process.exit(0);
}); })
.catch((error) => {
console.error("CW -> API sync failed:", error);
process.exit(1);
});
} }
+1 -1
View File
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
{ {
from: "lastName", from: "lastName",
to: "lastName", to: "lastName",
process: (value) => (value ? value : "Contact"), process: (value) => (value ? value : ""),
}, },
{ from: "nickName", to: "nickname" }, { from: "nickName", to: "nickname" },
{ from: "title", to: "title" }, { from: "title", to: "title" },
+10 -1
View File
@@ -1,6 +1,7 @@
import { import {
Opportunity as CwOpportunity, Opportunity as CwOpportunity,
OpportunityMember as CwOpportunityMember, OpportunityMember as CwOpportunityMember,
SoOppStatus as CwSoOppStatus,
} from "../../generated/prisma/client"; } from "../../generated/prisma/client";
import { OpportunityInterest } from "../../../api/generated/prisma/client"; import { OpportunityInterest } from "../../../api/generated/prisma/client";
import { Translation, skipRow } from "./types"; import { Translation, skipRow } from "./types";
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
dateBecameLead?: Date | null; dateBecameLead?: Date | null;
closedDate?: Date | null; closedDate?: Date | null;
closedFlag: boolean; closedFlag: boolean;
locationId?: number | null;
closedById?: string | null; closedById?: string | null;
updatedBy: string; updatedBy: string;
eneteredBy: string; eneteredBy: string;
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
CwOpportunityMember, CwOpportunityMember,
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag" "memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
>[]; >[];
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
}; };
const toInterest = (value: number | null): OpportunityInterest | null => { const toInterest = (value: number | null): OpportunityInterest | null => {
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
}, },
{ from: "companyRecId", to: "companyId" }, { from: "companyRecId", to: "companyId" },
{ from: "contactRecId", to: "contactId" }, { from: "contactRecId", to: "contactId" },
{ from: "ownerLevelRecId", to: "locationId" },
{ from: "companyAddressRecId", to: "siteId" }, { from: "companyAddressRecId", to: "siteId" },
{ from: "poNumber", to: "customerPO" }, { from: "poNumber", to: "customerPO" },
{ from: "dateCloseExpected", to: "expectedCloseDate" }, { from: "dateCloseExpected", to: "expectedCloseDate" },
{ from: "datePipelineChange", to: "pipelineChangeDate" }, { from: "datePipelineChange", to: "pipelineChangeDate" },
{ from: "dateBecameLead", to: "dateBecameLead" }, { from: "dateBecameLead", to: "dateBecameLead" },
{ from: "dateClosed", to: "closedDate" }, { from: "dateClosed", to: "closedDate" },
{ from: "oldCloseFlag", to: "closedFlag" }, {
from: "oldCloseFlag",
to: "closedFlag",
process: (_value, _context, row) =>
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
},
{ from: "closedBy", to: "closedById" }, { from: "closedBy", to: "closedById" },
{ {
from: "updatedBy", from: "updatedBy",