Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2bd498a35d | |||
| 86d7426e8b | |||
| afe56393e7 | |||
| b2cd26af30 | |||
| 0594816ea4 | |||
| 71fe36c0b8 | |||
| e0d575454e | |||
| 32bba31e72 | |||
| 1233535b20 | |||
| 2c737b22f1 | |||
| a3bfe9f374 | |||
| a106bb15a8 | |||
| d9a431d99a | |||
| 83377a7d0d | |||
| a81618007c | |||
| f56c49e242 |
@@ -130,6 +130,17 @@ jobs:
|
||||
ghcr.io/horizonstacksoftware/optima-api-migrate:latest
|
||||
ghcr.io/horizonstacksoftware/optima-api-migrate:${{ github.event.release.tag_name }}
|
||||
|
||||
- name: Build and push the dalpuri sync image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: api/Dockerfile
|
||||
push: true
|
||||
target: dalpuri-sync
|
||||
tags: |
|
||||
ghcr.io/horizonstacksoftware/optima-dalpuri-sync:latest
|
||||
ghcr.io/horizonstacksoftware/optima-dalpuri-sync:${{ github.event.release.tag_name }}
|
||||
|
||||
build-worker:
|
||||
name: Build - Worker
|
||||
needs: [test-api, test-dalpuri, test-ui]
|
||||
@@ -220,9 +231,10 @@ jobs:
|
||||
run: bun install --frozen-lockfile
|
||||
|
||||
- name: Rebuild native modules
|
||||
run: npm rebuild
|
||||
run: npm rebuild --ignore-scripts
|
||||
env:
|
||||
HUSKY: "0"
|
||||
HUSKY_SKIP_INSTALL: "1"
|
||||
|
||||
- name: Build macOS distributables
|
||||
run: bun run make:macos
|
||||
@@ -261,9 +273,10 @@ jobs:
|
||||
run: bun install --frozen-lockfile
|
||||
|
||||
- name: Rebuild native modules
|
||||
run: npm rebuild
|
||||
run: npm rebuild --ignore-scripts
|
||||
env:
|
||||
HUSKY: "0"
|
||||
HUSKY_SKIP_INSTALL: "1"
|
||||
|
||||
- name: Build Windows distributables
|
||||
run: bun run make -- --platform win32
|
||||
@@ -276,6 +289,55 @@ jobs:
|
||||
files: |
|
||||
ui/out/make/**/*.exe
|
||||
|
||||
# Runs a full CW → API data sync as a Kubernetes Job (the CW MSSQL and
|
||||
# API Postgres addresses are internal to the cluster and unreachable from
|
||||
# GitHub-hosted runners). Waits for both images to be built first and
|
||||
# must succeed before either the API or worker deploys.
|
||||
sync-cw-to-api:
|
||||
name: Sync - CW to API
|
||||
needs: [migrate-api, build-worker]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set the Kubernetes context
|
||||
uses: azure/k8s-set-context@v2
|
||||
with:
|
||||
method: kubeconfig
|
||||
kubeconfig: ${{ secrets.KUBECONFIG }}
|
||||
|
||||
- name: Checkout source code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Delete previous sync job if exists
|
||||
run: kubectl delete job -n optima -l app=dalpuri-sync --ignore-not-found
|
||||
|
||||
- name: Apply sync job
|
||||
run: |
|
||||
TAG=${{ github.event.release.tag_name }}
|
||||
sed "s/RELEASE_TAG/${TAG}/g" dalpuri/kubernetes/sync-job.yaml | kubectl apply -f -
|
||||
|
||||
- name: Wait for sync to complete
|
||||
run: |
|
||||
TAG=${{ github.event.release.tag_name }}
|
||||
JOB="job/dalpuri-sync-${TAG}"
|
||||
|
||||
kubectl wait --for=condition=complete --timeout=7200s -n optima "$JOB" &
|
||||
WAIT_COMPLETE=$!
|
||||
kubectl wait --for=condition=failed --timeout=7200s -n optima "$JOB" &
|
||||
WAIT_FAILED=$!
|
||||
|
||||
wait -n $WAIT_COMPLETE $WAIT_FAILED
|
||||
|
||||
echo "--- Sync job logs ---"
|
||||
kubectl logs -n optima "$JOB" --tail=500 || true
|
||||
|
||||
if kubectl get -n optima "$JOB" -o jsonpath='{.status.conditions[?(@.type=="Complete")].status}' | grep -q "True"; then
|
||||
echo "Sync completed successfully."
|
||||
exit 0
|
||||
else
|
||||
echo "Sync FAILED."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# ==========================================================================
|
||||
# Deploy jobs
|
||||
# ==========================================================================
|
||||
@@ -332,7 +394,7 @@ jobs:
|
||||
|
||||
deploy-api:
|
||||
name: Deploy - API
|
||||
needs: [migrate-api]
|
||||
needs: [migrate-api, sync-cw-to-api]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set the Kubernetes context
|
||||
@@ -402,7 +464,7 @@ jobs:
|
||||
|
||||
deploy-worker:
|
||||
name: Deploy - Worker
|
||||
needs: [build-worker]
|
||||
needs: [build-worker, sync-cw-to-api]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set the Kubernetes context
|
||||
|
||||
+34
-1
@@ -90,6 +90,9 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
|
||||
# Copy production node_modules (Prisma adapter needs native bindings)
|
||||
COPY --from=deps /app/node_modules/ ./node_modules/
|
||||
|
||||
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
|
||||
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
|
||||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
# ---- Stage 4: API server runtime image ----
|
||||
@@ -133,4 +136,34 @@ COPY api/prisma.config.ts ./api/prisma.config.ts
|
||||
RUN chmod +x /app/api/prisma/migrate-entrypoint.sh
|
||||
|
||||
WORKDIR /app/api
|
||||
CMD ["sh", "prisma/migrate-entrypoint.sh"]
|
||||
CMD ["sh", "prisma/migrate-entrypoint.sh"]
|
||||
|
||||
# ---- Stage 7: Dalpuri CW-to-API sync runner ----
|
||||
FROM oven/bun:1.3.11 AS dalpuri-sync
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY package.json bun.lock ./
|
||||
COPY api/package.json ./api/package.json
|
||||
COPY dalpuri/package.json ./dalpuri/package.json
|
||||
COPY ui/package.json ./ui/package.json
|
||||
COPY patches ./patches
|
||||
|
||||
RUN bun install --frozen-lockfile
|
||||
|
||||
COPY dalpuri/src/ ./dalpuri/src/
|
||||
COPY dalpuri/prisma/ ./dalpuri/prisma/
|
||||
COPY dalpuri/prisma.config.ts ./dalpuri/prisma.config.ts
|
||||
|
||||
COPY api/prisma/ ./api/prisma/
|
||||
COPY api/prisma.config.ts ./api/prisma.config.ts
|
||||
|
||||
WORKDIR /app/dalpuri
|
||||
RUN DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" \
|
||||
bunx prisma generate
|
||||
|
||||
WORKDIR /app/api
|
||||
RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
|
||||
|
||||
WORKDIR /app/dalpuri
|
||||
CMD ["bun", "run", "src/sync.ts"]
|
||||
@@ -20,6 +20,11 @@ spec:
|
||||
env:
|
||||
- name: MANAGER_SOCKET_URL
|
||||
value: "http://optima-api.optima.svc.cluster.local:8671"
|
||||
- name: API_DATABASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: api-env-secret
|
||||
key: DATABASE_URL
|
||||
envFrom:
|
||||
- secretRef:
|
||||
name: api-env-secret
|
||||
|
||||
@@ -26,8 +26,11 @@ while [ $ATTEMPT -lt $MAX_RETRIES ]; do
|
||||
# P3009: a previously-failed migration is blocking deploy.
|
||||
# The error message contains the migration name in backticks:
|
||||
# The `20260402000000_fix_severity_typo` migration started at ... failed
|
||||
if echo "$DEPLOY_OUTPUT" | grep -q "P3009"; then
|
||||
FAILED=$(echo "$DEPLOY_OUTPUT" | grep -oE '\`[0-9]{14}(_[a-zA-Z_]+)?\`' | tr -d '\`' | head -1)
|
||||
# Strip ANSI escape codes first (Prisma may colorize output even without TTY),
|
||||
# then use a simple backtick-content regex rather than a rigid format match.
|
||||
CLEAN_OUTPUT=$(printf '%s\n' "$DEPLOY_OUTPUT" | sed 's/\x1b\[[0-9;]*[mGKHFJr]//g')
|
||||
if printf '%s\n' "$CLEAN_OUTPUT" | grep -q "P3009"; then
|
||||
FAILED=$(printf '%s\n' "$CLEAN_OUTPUT" | grep -o '`[^`]*`' | grep '[0-9]' | tr -d '`' | head -1)
|
||||
if [ -n "$FAILED" ]; then
|
||||
echo "[migrate] Resolving failed migration as rolled-back: $FAILED"
|
||||
RESOLVE_OUTPUT=""
|
||||
|
||||
@@ -210,6 +210,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS "CatalogItem_id_key" ON "CatalogItem"("id");
|
||||
|
||||
-- =============================================================================
|
||||
-- SECTION 4: Company — change id TEXT→INTEGER, add uid PK, add columns
|
||||
-- Production has ~4500 rows with CUID text PKs and cw_CompanyId integers
|
||||
-- that must be preserved as uid and id respectively.
|
||||
-- =============================================================================
|
||||
|
||||
-- Drop FKs that reference Company by old id
|
||||
@@ -229,12 +231,18 @@ DROP INDEX IF EXISTS "Company_cw_CompanyId_key";
|
||||
DROP INDEX IF EXISTS "Company_cw_Identifier_key";
|
||||
|
||||
DO $$ BEGIN
|
||||
-- Add uid PK column if missing
|
||||
-- Step 1: Add uid as NULLABLE (no default) so existing rows stay NULL temporarily
|
||||
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'Company' AND column_name = 'uid') THEN
|
||||
ALTER TABLE "Company" ADD COLUMN "uid" TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE "Company" ADD COLUMN "uid" TEXT;
|
||||
END IF;
|
||||
|
||||
-- Swap PK from id to uid
|
||||
-- Step 2: Populate uid from the old text PK (old id was a CUID — it becomes uid)
|
||||
UPDATE "Company" SET "uid" = "id" WHERE "uid" IS NULL;
|
||||
|
||||
-- Step 3: Now make uid NOT NULL (all rows are populated)
|
||||
ALTER TABLE "Company" ALTER COLUMN "uid" SET NOT NULL;
|
||||
|
||||
-- Step 4: Swap PK from id (text) to uid (text)
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM information_schema.table_constraints tc
|
||||
JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name
|
||||
@@ -244,7 +252,8 @@ DO $$ BEGIN
|
||||
ALTER TABLE "Company" ADD CONSTRAINT "Company_pkey" PRIMARY KEY ("uid");
|
||||
END IF;
|
||||
|
||||
-- Change id from TEXT to INTEGER
|
||||
-- Step 5: Change id from TEXT to INTEGER
|
||||
-- NOTE: do this BEFORE dropping cw_CompanyId so we can populate from it below
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM information_schema.columns
|
||||
WHERE table_name = 'Company' AND column_name = 'id' AND data_type = 'text'
|
||||
@@ -253,7 +262,12 @@ DO $$ BEGIN
|
||||
ALTER TABLE "Company" ADD COLUMN "id" INTEGER;
|
||||
END IF;
|
||||
|
||||
-- Drop old CW-specific columns
|
||||
-- Step 6: Populate new integer id from cw_CompanyId (CW integer company id)
|
||||
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'Company' AND column_name = 'cw_CompanyId') THEN
|
||||
UPDATE "Company" SET "id" = "cw_CompanyId" WHERE "id" IS NULL;
|
||||
END IF;
|
||||
|
||||
-- Step 7: Drop old CW-specific columns (data now in id and uid)
|
||||
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'Company' AND column_name = 'cw_CompanyId') THEN
|
||||
ALTER TABLE "Company" DROP COLUMN "cw_CompanyId";
|
||||
END IF;
|
||||
@@ -261,7 +275,7 @@ DO $$ BEGIN
|
||||
ALTER TABLE "Company" DROP COLUMN "cw_Identifier";
|
||||
END IF;
|
||||
|
||||
-- Add new columns
|
||||
-- Step 8: Add new columns
|
||||
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'Company' AND column_name = 'dateDeleted') THEN
|
||||
ALTER TABLE "Company" ADD COLUMN "dateDeleted" TIMESTAMP(3);
|
||||
END IF;
|
||||
@@ -291,10 +305,15 @@ DO $$ BEGIN
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
-- Make Company.id NOT NULL (all rows were populated from cw_CompanyId above)
|
||||
ALTER TABLE "Company" ALTER COLUMN "id" SET NOT NULL;
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS "Company_id_key" ON "Company"("id");
|
||||
|
||||
-- =============================================================================
|
||||
-- SECTION 5: UnifiSite — change companyId from TEXT to INTEGER
|
||||
-- SECTION 5: UnifiSite — change companyId from TEXT to INTEGER (data migration)
|
||||
-- Production has ~180 rows where companyId (text) = Company.uid (the old text
|
||||
-- PK that was copied into uid in Section 4). We join on Company.uid to get
|
||||
-- the new integer Company.id and preserve the relationship.
|
||||
-- =============================================================================
|
||||
|
||||
DO $$ BEGIN
|
||||
@@ -302,8 +321,24 @@ DO $$ BEGIN
|
||||
SELECT 1 FROM information_schema.columns
|
||||
WHERE table_name = 'UnifiSite' AND column_name = 'companyId' AND data_type = 'text'
|
||||
) THEN
|
||||
-- Add a temporary integer column to hold the mapped value
|
||||
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'UnifiSite' AND column_name = 'companyId_int') THEN
|
||||
ALTER TABLE "UnifiSite" ADD COLUMN "companyId_int" INTEGER;
|
||||
END IF;
|
||||
-- Map old text companyId (= Company.uid) → new integer Company.id
|
||||
UPDATE "UnifiSite" us
|
||||
SET "companyId_int" = c."id"
|
||||
FROM "Company" c
|
||||
WHERE c."uid" = us."companyId";
|
||||
-- Replace old text column with the populated integer column
|
||||
ALTER TABLE "UnifiSite" DROP COLUMN "companyId";
|
||||
ALTER TABLE "UnifiSite" ADD COLUMN "companyId" INTEGER;
|
||||
ALTER TABLE "UnifiSite" RENAME COLUMN "companyId_int" TO "companyId";
|
||||
ELSIF EXISTS (
|
||||
SELECT 1 FROM information_schema.columns
|
||||
WHERE table_name = 'UnifiSite' AND column_name = 'companyId_int'
|
||||
) THEN
|
||||
-- Edge case: int column added but not renamed (interrupted previous run)
|
||||
ALTER TABLE "UnifiSite" RENAME COLUMN "companyId_int" TO "companyId";
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -1467,7 +1502,8 @@ ALTER TABLE "CatalogItem" ALTER COLUMN "uid" DROP DEFAULT;
|
||||
ALTER TABLE "CatalogItem" ALTER COLUMN "id" SET NOT NULL;
|
||||
ALTER TABLE "CatalogItem" ALTER COLUMN "subcategoryId" DROP DEFAULT;
|
||||
|
||||
-- Company: drop defaults, enforce NOT NULL
|
||||
-- Company: uid was added nullable (no default), id was made NOT NULL in Section 4.
|
||||
-- These are no-ops but kept for safety on fresh DBs.
|
||||
ALTER TABLE "Company" ALTER COLUMN "uid" DROP DEFAULT;
|
||||
ALTER TABLE "Company" ALTER COLUMN "id" SET NOT NULL;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import PdfPrinter from "pdfmake/src/Printer";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
export interface QuoteLineItem {
|
||||
@@ -110,7 +110,26 @@ const COMPANY = {
|
||||
|
||||
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
||||
|
||||
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto");
|
||||
function resolveRobotoFontDir(): string {
|
||||
const candidates = [
|
||||
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
|
||||
];
|
||||
|
||||
for (const dir of candidates) {
|
||||
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
|
||||
return dir;
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
const fontDir = resolveRobotoFontDir();
|
||||
const fonts = {
|
||||
Roboto: {
|
||||
normal: join(fontDir, "Roboto-Regular.ttf"),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Server } from "socket.io";
|
||||
import { events, EventTypes } from "../globalEvents";
|
||||
import { WorkerQueue } from "./queues";
|
||||
import { reserveWorkerId } from "../../workert";
|
||||
|
||||
function emitGlobalEvent<K extends keyof EventTypes>(
|
||||
name: K,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Socket } from "socket.io-client";
|
||||
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
||||
import { prisma } from "../../constants";
|
||||
|
||||
/**
|
||||
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
||||
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
|
||||
* Called every 5 seconds via PgBoss from the API process interval.
|
||||
*/
|
||||
export async function executeIncrementalSync(): Promise<void> {
|
||||
return executeForcedIncrementalDalpuriSync();
|
||||
let jobRunId: string | undefined;
|
||||
|
||||
try {
|
||||
const run = await prisma.syncJobRun.create({
|
||||
data: {
|
||||
jobType: "INCREMENTAL_SYNC",
|
||||
status: "RUNNING",
|
||||
triggeredBy: "worker",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
jobRunId = run.id;
|
||||
} catch (err) {
|
||||
// Sync should still run even if tracking insert fails.
|
||||
console.error("[sync] Failed to create incremental SyncJobRun", err);
|
||||
}
|
||||
|
||||
try {
|
||||
await executeForcedIncrementalDalpuriSync({ jobRunId });
|
||||
|
||||
if (jobRunId) {
|
||||
await prisma.syncJobRun.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "COMPLETED",
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
if (jobRunId) {
|
||||
const errorSummary = err instanceof Error ? err.message : String(err);
|
||||
await prisma.syncJobRun
|
||||
.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "FAILED",
|
||||
completedAt: new Date(),
|
||||
errorSummary: errorSummary.slice(0, 2000),
|
||||
},
|
||||
})
|
||||
.catch(() => {
|
||||
// Best-effort update only.
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
|
||||
* Called on an interval from the main API process so it survives worker restarts.
|
||||
*/
|
||||
export async function enqueueIncrementalSync(): Promise<void> {
|
||||
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
|
||||
const jobId = await getBoss().send(
|
||||
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
|
||||
{
|
||||
enqueuedAt: new Date().toISOString(),
|
||||
},
|
||||
{
|
||||
singletonKey: "dalpuri-incremental-sync",
|
||||
}
|
||||
);
|
||||
|
||||
if (!jobId) {
|
||||
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
|
||||
queueType: WorkerQueue,
|
||||
workFn: (workerSocket: Socket) => Promise<T>,
|
||||
): Promise<T> {
|
||||
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
// Request a worker ID and namespace from the manager
|
||||
socket.emit(
|
||||
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
|
||||
}
|
||||
|
||||
// Connect to the worker-specific namespace
|
||||
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
|
||||
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
|
||||
reconnection: false,
|
||||
});
|
||||
|
||||
|
||||
+30
-2
@@ -163,17 +163,45 @@ if (import.meta.main) {
|
||||
// Register job handler for DALPURI_FULL_SYNC
|
||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||
const socket = await ensureManagerSocketReady();
|
||||
await enqueueDalpuriFullSync();
|
||||
await enqueueDalpuriFullSync(socket);
|
||||
});
|
||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||
|
||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||
await executeIncrementalSync();
|
||||
const startedAt = Date.now();
|
||||
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
||||
try {
|
||||
await executeIncrementalSync();
|
||||
console.log(
|
||||
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
||||
);
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
||||
err
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||
|
||||
const enqueueIncrementalWithLogging = () => {
|
||||
enqueueIncrementalSync().catch((err) => {
|
||||
console.error(
|
||||
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
||||
// API interval scheduling is unavailable.
|
||||
enqueueIncrementalWithLogging();
|
||||
setInterval(enqueueIncrementalWithLogging, 5_000);
|
||||
console.log("[worker] Started 5-second incremental enqueue interval");
|
||||
|
||||
// Register job handler for REFRESH_SALES_METRICS
|
||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: dalpuri-sync-RELEASE_TAG
|
||||
namespace: optima
|
||||
labels:
|
||||
app: dalpuri-sync
|
||||
spec:
|
||||
backoffLimit: 0
|
||||
ttlSecondsAfterFinished: 86400
|
||||
activeDeadlineSeconds: 7200
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: dalpuri-sync
|
||||
spec:
|
||||
containers:
|
||||
- name: sync
|
||||
image: ghcr.io/horizonstacksoftware/optima-dalpuri-sync:RELEASE_TAG
|
||||
env:
|
||||
- name: CW_DATABASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: api-env-secret
|
||||
key: CW_DATABASE_URL
|
||||
- name: API_DATABASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: api-env-secret
|
||||
key: DATABASE_URL
|
||||
restartPolicy: Never
|
||||
imagePullSecrets:
|
||||
- name: github-container-registry
|
||||
@@ -294,6 +294,22 @@ const refreshContextFromApi = async (
|
||||
}
|
||||
}
|
||||
|
||||
const cwMembers = await apiPrisma.cwMember.findMany({
|
||||
select: { cwMemberId: true, identifier: true },
|
||||
});
|
||||
for (const member of cwMembers) {
|
||||
if (
|
||||
member.cwMemberId != null &&
|
||||
member.identifier &&
|
||||
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
|
||||
) {
|
||||
context.userIdentifiersByMemberRecId.set(
|
||||
member.cwMemberId,
|
||||
member.identifier
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (const board of boards) {
|
||||
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
||||
}
|
||||
@@ -426,6 +442,12 @@ const sanitizeModelData = (
|
||||
) {
|
||||
sanitized.statusId = null;
|
||||
}
|
||||
if (
|
||||
sanitized.locationId != null &&
|
||||
!context.corporateLocationIds.has(sanitized.locationId as number)
|
||||
) {
|
||||
sanitized.locationId = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (targetModel === "schedule") {
|
||||
@@ -734,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
|
||||
secondarySalesFlag: true,
|
||||
},
|
||||
},
|
||||
soOppStatus: {
|
||||
select: {
|
||||
closedFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
+216
-19
@@ -75,6 +75,136 @@ type DeleteResult = {
|
||||
|
||||
let incrementalDeleteStepIndex = 0;
|
||||
|
||||
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const criticalFullSyncIntervalMinutes = Math.max(
|
||||
1,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
|
||||
10
|
||||
) || 15
|
||||
);
|
||||
|
||||
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||
|
||||
const criticalCwWatermarkOverlapSeconds = Math.max(
|
||||
5,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
|
||||
10
|
||||
) || 60
|
||||
);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
|
||||
criticalCwWatermarkOverlapSeconds * 1000;
|
||||
|
||||
const criticalCwDeltaLimit = Math.max(
|
||||
100,
|
||||
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
|
||||
5000
|
||||
);
|
||||
|
||||
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
|
||||
|
||||
const shouldForceCriticalFullSync = (
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): boolean => {
|
||||
if (!forceIncremental) return false;
|
||||
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
|
||||
|
||||
const now = Date.now();
|
||||
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
|
||||
|
||||
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
lastCriticalFullSyncByStep.set(step.name, now);
|
||||
return true;
|
||||
};
|
||||
|
||||
const computeCriticalCwWatermarkDecision = async (
|
||||
cwPrisma: CwPrismaClient,
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): Promise<SmartSyncDecision | null> => {
|
||||
if (!forceIncremental) return null;
|
||||
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
|
||||
|
||||
const cwDelegate = (
|
||||
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
||||
)[step.sourceModel];
|
||||
|
||||
if (!cwDelegate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const existingWhere =
|
||||
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
||||
|
||||
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
|
||||
const lowerBound = lastWatermark
|
||||
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
|
||||
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
|
||||
|
||||
const rows = (await cwDelegate.findMany({
|
||||
select: {
|
||||
[step.sourceIdField]: true,
|
||||
[step.sourceUpdatedField]: true,
|
||||
},
|
||||
where: {
|
||||
...(existingWhere as Record<string, unknown>),
|
||||
[step.sourceUpdatedField]: {
|
||||
gte: lowerBound,
|
||||
},
|
||||
},
|
||||
orderBy: { [step.sourceUpdatedField]: "asc" },
|
||||
take: criticalCwDeltaLimit,
|
||||
})) as Row[];
|
||||
|
||||
if (rows.length >= criticalCwDeltaLimit) {
|
||||
console.warn(
|
||||
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
|
||||
);
|
||||
return { mode: "full", differences: [] };
|
||||
}
|
||||
|
||||
if (rows.length > 0) {
|
||||
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
|
||||
if (latest) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, latest);
|
||||
}
|
||||
} else if (!lastWatermark) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, new Date());
|
||||
}
|
||||
|
||||
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
|
||||
console.log(
|
||||
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
|
||||
);
|
||||
|
||||
return {
|
||||
mode: "incremental",
|
||||
sourceIds,
|
||||
differences: [],
|
||||
};
|
||||
};
|
||||
|
||||
const parseEnvFile = (path: string): Record<string, string> => {
|
||||
const envData = readFileSync(path, "utf8");
|
||||
const out: Record<string, string> = {};
|
||||
@@ -107,6 +237,20 @@ const resolveApiDatabaseUrl = (): string => {
|
||||
if (process.env.OPTIMA_API_DATABASE_URL)
|
||||
return process.env.OPTIMA_API_DATABASE_URL;
|
||||
|
||||
// Worker/runtime fallback:
|
||||
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
|
||||
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
|
||||
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
|
||||
return process.env.DATABASE_URL;
|
||||
}
|
||||
|
||||
if (
|
||||
process.env.DATABASE_URL &&
|
||||
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
|
||||
) {
|
||||
return process.env.DATABASE_URL;
|
||||
}
|
||||
|
||||
const candidates = [
|
||||
resolve(import.meta.dir, "../../api/.env"),
|
||||
resolve(process.cwd(), "../api/.env"),
|
||||
@@ -323,6 +467,22 @@ const refreshContextFromApi = async (
|
||||
}
|
||||
}
|
||||
|
||||
const cwMembers = await apiPrisma.cwMember.findMany({
|
||||
select: { cwMemberId: true, identifier: true },
|
||||
});
|
||||
for (const member of cwMembers) {
|
||||
if (
|
||||
member.cwMemberId != null &&
|
||||
member.identifier &&
|
||||
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
|
||||
) {
|
||||
context.userIdentifiersByMemberRecId.set(
|
||||
member.cwMemberId,
|
||||
member.identifier
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (const board of boards) {
|
||||
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
||||
}
|
||||
@@ -636,6 +796,13 @@ const sanitizeModelData = (
|
||||
) {
|
||||
sanitized.stageId = null;
|
||||
}
|
||||
// Nullify locationId if the corporate location doesn't exist
|
||||
if (
|
||||
sanitized.locationId != null &&
|
||||
!context.corporateLocationIds.has(sanitized.locationId as number)
|
||||
) {
|
||||
sanitized.locationId = null;
|
||||
}
|
||||
// Nullify taxCodeId if the tax code hasn't synced yet
|
||||
if (
|
||||
sanitized.taxCodeId != null &&
|
||||
@@ -1328,6 +1495,15 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
||||
|
||||
const steps: Step[] = [
|
||||
{
|
||||
name: "CW Members",
|
||||
sourceModel: "member",
|
||||
targetModel: "cwMember",
|
||||
translation: cwMemberTranslation as unknown as AnyTranslation,
|
||||
uniqueField: "cwMemberId",
|
||||
sourceIdField: "memberRecId",
|
||||
sourceUpdatedField: "lastUpdatedUtc",
|
||||
},
|
||||
{
|
||||
name: "Users",
|
||||
sourceModel: "member",
|
||||
@@ -1342,15 +1518,6 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "CW Members",
|
||||
sourceModel: "member",
|
||||
targetModel: "cwMember",
|
||||
translation: cwMemberTranslation as unknown as AnyTranslation,
|
||||
uniqueField: "cwMemberId",
|
||||
sourceIdField: "memberRecId",
|
||||
sourceUpdatedField: "lastUpdatedUtc",
|
||||
},
|
||||
{
|
||||
name: "Companies",
|
||||
sourceModel: "company",
|
||||
@@ -1585,6 +1752,11 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
secondarySalesFlag: true,
|
||||
},
|
||||
},
|
||||
soOppStatus: {
|
||||
select: {
|
||||
closedFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1729,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
|
||||
cwPrisma,
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const effectiveDecision = forceCriticalFullSync
|
||||
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||
: criticalWatermarkDecision ?? decision;
|
||||
|
||||
if (forceCriticalFullSync) {
|
||||
console.log(
|
||||
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
|
||||
);
|
||||
}
|
||||
|
||||
const sourceIdsFilter =
|
||||
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
||||
effectiveDecision.mode === "incremental"
|
||||
? effectiveDecision.sourceIds
|
||||
: undefined;
|
||||
console.log(
|
||||
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
||||
decision.mode
|
||||
effectiveDecision.mode
|
||||
}${
|
||||
decision.mode === "incremental"
|
||||
? ` (${decision.sourceIds.length} ids)`
|
||||
effectiveDecision.mode === "incremental"
|
||||
? ` (${effectiveDecision.sourceIds.length} ids)`
|
||||
: ""
|
||||
}`
|
||||
);
|
||||
if (logAllDifferences) {
|
||||
logAllSmartSyncDifferences(step, decision.differences);
|
||||
logAllSmartSyncDifferences(step, effectiveDecision.differences);
|
||||
}
|
||||
const result = await syncStep(
|
||||
cwPrisma,
|
||||
@@ -1763,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
|
||||
await writeStepLog(
|
||||
step.name,
|
||||
decision.mode,
|
||||
effectiveDecision.mode,
|
||||
result,
|
||||
{ deleted: 0, failed: 0 },
|
||||
Date.now() - stepStart
|
||||
@@ -1860,8 +2053,12 @@ export const executeForcedIncrementalDalpuriSync = async (options?: {
|
||||
};
|
||||
|
||||
if (import.meta.main) {
|
||||
executeFullDalpuriSync().catch((error) => {
|
||||
console.error("CW -> API sync failed:", error);
|
||||
process.exit(1);
|
||||
});
|
||||
executeFullDalpuriSync()
|
||||
.then(() => {
|
||||
process.exit(0);
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error("CW -> API sync failed:", error);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import {
|
||||
Opportunity as CwOpportunity,
|
||||
OpportunityMember as CwOpportunityMember,
|
||||
SoOppStatus as CwSoOppStatus,
|
||||
} from "../../generated/prisma/client";
|
||||
import { OpportunityInterest } from "../../../api/generated/prisma/client";
|
||||
import { Translation, skipRow } from "./types";
|
||||
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
|
||||
dateBecameLead?: Date | null;
|
||||
closedDate?: Date | null;
|
||||
closedFlag: boolean;
|
||||
locationId?: number | null;
|
||||
closedById?: string | null;
|
||||
updatedBy: string;
|
||||
eneteredBy: string;
|
||||
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
|
||||
CwOpportunityMember,
|
||||
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
|
||||
>[];
|
||||
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
|
||||
};
|
||||
|
||||
const toInterest = (value: number | null): OpportunityInterest | null => {
|
||||
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
|
||||
},
|
||||
{ from: "companyRecId", to: "companyId" },
|
||||
{ from: "contactRecId", to: "contactId" },
|
||||
{ from: "ownerLevelRecId", to: "locationId" },
|
||||
{ from: "companyAddressRecId", to: "siteId" },
|
||||
{ from: "poNumber", to: "customerPO" },
|
||||
{ from: "dateCloseExpected", to: "expectedCloseDate" },
|
||||
{ from: "datePipelineChange", to: "pipelineChangeDate" },
|
||||
{ from: "dateBecameLead", to: "dateBecameLead" },
|
||||
{ from: "dateClosed", to: "closedDate" },
|
||||
{ from: "oldCloseFlag", to: "closedFlag" },
|
||||
{
|
||||
from: "oldCloseFlag",
|
||||
to: "closedFlag",
|
||||
process: (_value, _context, row) =>
|
||||
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
|
||||
},
|
||||
{ from: "closedBy", to: "closedById" },
|
||||
{
|
||||
from: "updatedBy",
|
||||
|
||||
Reference in New Issue
Block a user