Compare commits
30 Commits
v0.1.30
...
setup-admin
| Author | SHA1 | Date | |
|---|---|---|---|
| a8c48e8c75 | |||
| 051edb5f78 | |||
| f87f6dd336 | |||
| 2eb387811d | |||
| db27c9224d | |||
| 7f6e6fdfbc | |||
| 5f5f610060 | |||
| 809841d672 | |||
| 276eb563bf | |||
| 7624ba0bc0 | |||
| 1063231107 | |||
| 2cd5dee612 | |||
| 8ac1cbaf3e | |||
| bd7e6a37cd | |||
| 4e0799f9d9 | |||
| 223a06ba27 | |||
| 503657d168 | |||
| cf68e281e8 | |||
| 57b5763d41 | |||
| 2bd498a35d | |||
| 86d7426e8b | |||
| afe56393e7 | |||
| b2cd26af30 | |||
| 0594816ea4 | |||
| 71fe36c0b8 | |||
| e0d575454e | |||
| 32bba31e72 | |||
| 1233535b20 | |||
| 2c737b22f1 | |||
| a3bfe9f374 |
@@ -18,7 +18,7 @@ jobs:
|
|||||||
bun-version: "1.3.6"
|
bun-version: "1.3.6"
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: bun install --frozen-lockfile
|
run: bun install
|
||||||
|
|
||||||
- name: Generate API Prisma client
|
- name: Generate API Prisma client
|
||||||
run: DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
|
run: DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ jobs:
|
|||||||
bun-version: "1.3.6"
|
bun-version: "1.3.6"
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: bun install --frozen-lockfile
|
run: bun install
|
||||||
|
|
||||||
- name: Generate Dalpuri Prisma client (CW MSSQL)
|
- name: Generate Dalpuri Prisma client (CW MSSQL)
|
||||||
run: DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" bunx prisma generate
|
run: DATABASE_URL="sqlserver://localhost:1433;database=dummy;user=dummy;password=dummy;trustServerCertificate=true" bunx prisma generate
|
||||||
|
|||||||
@@ -231,9 +231,10 @@ jobs:
|
|||||||
run: bun install --frozen-lockfile
|
run: bun install --frozen-lockfile
|
||||||
|
|
||||||
- name: Rebuild native modules
|
- name: Rebuild native modules
|
||||||
run: npm rebuild
|
run: npm rebuild --ignore-scripts
|
||||||
env:
|
env:
|
||||||
HUSKY: "0"
|
HUSKY: "0"
|
||||||
|
HUSKY_SKIP_INSTALL: "1"
|
||||||
|
|
||||||
- name: Build macOS distributables
|
- name: Build macOS distributables
|
||||||
run: bun run make:macos
|
run: bun run make:macos
|
||||||
@@ -272,9 +273,10 @@ jobs:
|
|||||||
run: bun install --frozen-lockfile
|
run: bun install --frozen-lockfile
|
||||||
|
|
||||||
- name: Rebuild native modules
|
- name: Rebuild native modules
|
||||||
run: npm rebuild
|
run: npm rebuild --ignore-scripts
|
||||||
env:
|
env:
|
||||||
HUSKY: "0"
|
HUSKY: "0"
|
||||||
|
HUSKY_SKIP_INSTALL: "1"
|
||||||
|
|
||||||
- name: Build Windows distributables
|
- name: Build Windows distributables
|
||||||
run: bun run make -- --platform win32
|
run: bun run make -- --platform win32
|
||||||
@@ -318,9 +320,9 @@ jobs:
|
|||||||
TAG=${{ github.event.release.tag_name }}
|
TAG=${{ github.event.release.tag_name }}
|
||||||
JOB="job/dalpuri-sync-${TAG}"
|
JOB="job/dalpuri-sync-${TAG}"
|
||||||
|
|
||||||
kubectl wait --for=condition=complete --timeout=1800s -n optima "$JOB" &
|
kubectl wait --for=condition=complete --timeout=7200s -n optima "$JOB" &
|
||||||
WAIT_COMPLETE=$!
|
WAIT_COMPLETE=$!
|
||||||
kubectl wait --for=condition=failed --timeout=1800s -n optima "$JOB" &
|
kubectl wait --for=condition=failed --timeout=7200s -n optima "$JOB" &
|
||||||
WAIT_FAILED=$!
|
WAIT_FAILED=$!
|
||||||
|
|
||||||
wait -n $WAIT_COMPLETE $WAIT_FAILED
|
wait -n $WAIT_COMPLETE $WAIT_FAILED
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ jobs:
|
|||||||
bun-version: "1.3.11"
|
bun-version: "1.3.11"
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: bun install --frozen-lockfile
|
run: bun install
|
||||||
|
|
||||||
- name: Run unit tests
|
- name: Run unit tests
|
||||||
run: bun run test:unit -- --run
|
run: bun run test:unit -- --run
|
||||||
|
|||||||
+34
-8
@@ -17,7 +17,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
|
|||||||
COPY ui/package.json ./ui/package.json
|
COPY ui/package.json ./ui/package.json
|
||||||
COPY patches ./patches
|
COPY patches ./patches
|
||||||
|
|
||||||
RUN bun install --frozen-lockfile --production
|
RUN bun install --production
|
||||||
|
|
||||||
# ---- Stage 2: Build ----
|
# ---- Stage 2: Build ----
|
||||||
FROM oven/bun:1.3.11 AS build
|
FROM oven/bun:1.3.11 AS build
|
||||||
@@ -32,7 +32,7 @@ COPY ui/package.json ./ui/package.json
|
|||||||
COPY patches ./patches
|
COPY patches ./patches
|
||||||
|
|
||||||
# Install all deps (including dev) for the full workspace
|
# Install all deps (including dev) for the full workspace
|
||||||
RUN bun install --frozen-lockfile
|
RUN bun install
|
||||||
|
|
||||||
# Copy API source and config
|
# Copy API source and config
|
||||||
COPY api/src/ ./api/src/
|
COPY api/src/ ./api/src/
|
||||||
@@ -90,6 +90,13 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
|
|||||||
# Copy production node_modules (Prisma adapter needs native bindings)
|
# Copy production node_modules (Prisma adapter needs native bindings)
|
||||||
COPY --from=deps /app/node_modules/ ./node_modules/
|
COPY --from=deps /app/node_modules/ ./node_modules/
|
||||||
|
|
||||||
|
# Copy bun so prisma migrate deploy can run at container startup
|
||||||
|
COPY --from=build /usr/local/bin/bun /usr/local/bin/bun
|
||||||
|
RUN ln -s /usr/local/bin/bun /usr/local/bin/bunx
|
||||||
|
|
||||||
|
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
|
||||||
|
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
|
||||||
|
|
||||||
ENV NODE_ENV=production
|
ENV NODE_ENV=production
|
||||||
|
|
||||||
# ---- Stage 4: API server runtime image ----
|
# ---- Stage 4: API server runtime image ----
|
||||||
@@ -101,7 +108,7 @@ COPY --from=build /app/api/logo.png ./logo.png
|
|||||||
COPY --from=build /app/api/src/modules/sales-utils/salesTaxRates.json ./salesTaxRates.json
|
COPY --from=build /app/api/src/modules/sales-utils/salesTaxRates.json ./salesTaxRates.json
|
||||||
|
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
CMD ["./server"]
|
CMD ["sh", "-c", "bunx prisma migrate deploy && ./server"]
|
||||||
|
|
||||||
# ---- Stage 5: Worker runtime image ----
|
# ---- Stage 5: Worker runtime image ----
|
||||||
FROM runtime-base AS worker
|
FROM runtime-base AS worker
|
||||||
@@ -125,15 +132,13 @@ COPY dalpuri/package.json ./dalpuri/package.json
|
|||||||
COPY ui/package.json ./ui/package.json
|
COPY ui/package.json ./ui/package.json
|
||||||
COPY patches ./patches
|
COPY patches ./patches
|
||||||
|
|
||||||
RUN bun install --frozen-lockfile
|
RUN bun install
|
||||||
|
|
||||||
COPY api/prisma/ ./api/prisma/
|
COPY api/prisma/ ./api/prisma/
|
||||||
COPY api/prisma.config.ts ./api/prisma.config.ts
|
COPY api/prisma.config.ts ./api/prisma.config.ts
|
||||||
|
|
||||||
RUN chmod +x /app/api/prisma/migrate-entrypoint.sh
|
|
||||||
|
|
||||||
WORKDIR /app/api
|
WORKDIR /app/api
|
||||||
CMD ["sh", "prisma/migrate-entrypoint.sh"]
|
CMD ["bunx", "prisma", "migrate", "deploy"]
|
||||||
|
|
||||||
# ---- Stage 7: Dalpuri CW-to-API sync runner ----
|
# ---- Stage 7: Dalpuri CW-to-API sync runner ----
|
||||||
FROM oven/bun:1.3.11 AS dalpuri-sync
|
FROM oven/bun:1.3.11 AS dalpuri-sync
|
||||||
@@ -146,7 +151,7 @@ COPY dalpuri/package.json ./dalpuri/package.json
|
|||||||
COPY ui/package.json ./ui/package.json
|
COPY ui/package.json ./ui/package.json
|
||||||
COPY patches ./patches
|
COPY patches ./patches
|
||||||
|
|
||||||
RUN bun install --frozen-lockfile
|
RUN bun install
|
||||||
|
|
||||||
COPY dalpuri/src/ ./dalpuri/src/
|
COPY dalpuri/src/ ./dalpuri/src/
|
||||||
COPY dalpuri/prisma/ ./dalpuri/prisma/
|
COPY dalpuri/prisma/ ./dalpuri/prisma/
|
||||||
@@ -164,3 +169,24 @@ RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma gen
|
|||||||
|
|
||||||
WORKDIR /app/dalpuri
|
WORKDIR /app/dalpuri
|
||||||
CMD ["bun", "run", "src/sync.ts"]
|
CMD ["bun", "run", "src/sync.ts"]
|
||||||
|
|
||||||
|
FROM oven/bun:1.3.11 AS setup-admin
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY package.json bun.lock ./
|
||||||
|
COPY api/package.json ./api/package.json
|
||||||
|
COPY dalpuri/package.json ./dalpuri/package.json
|
||||||
|
COPY ui/package.json ./ui/package.json
|
||||||
|
COPY patches ./patches
|
||||||
|
|
||||||
|
RUN bun install
|
||||||
|
|
||||||
|
COPY api/prisma/ ./api/prisma/
|
||||||
|
COPY api/prisma.config.ts ./api/prisma.config.ts
|
||||||
|
COPY api/setup-admin.ts ./api/setup-admin.ts
|
||||||
|
|
||||||
|
WORKDIR /app/api
|
||||||
|
RUN DATABASE_URL="postgresql://dummy:dummy@localhost:5432/dummy" bunx prisma generate
|
||||||
|
|
||||||
|
CMD ["bun", "run", "setup-admin.ts"]
|
||||||
|
|||||||
@@ -20,6 +20,11 @@ spec:
|
|||||||
env:
|
env:
|
||||||
- name: MANAGER_SOCKET_URL
|
- name: MANAGER_SOCKET_URL
|
||||||
value: "http://optima-api.optima.svc.cluster.local:8671"
|
value: "http://optima-api.optima.svc.cluster.local:8671"
|
||||||
|
- name: API_DATABASE_URL
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: api-env-secret
|
||||||
|
key: DATABASE_URL
|
||||||
envFrom:
|
envFrom:
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: api-env-secret
|
name: api-env-secret
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
import { defineConfig, env } from 'prisma/config'
|
export default {
|
||||||
|
|
||||||
export default defineConfig({
|
|
||||||
schema: 'prisma/schema.prisma',
|
schema: 'prisma/schema.prisma',
|
||||||
migrations: {
|
migrations: {
|
||||||
path: 'prisma/migrations',
|
path: 'prisma/migrations',
|
||||||
},
|
},
|
||||||
datasource: {
|
datasource: {
|
||||||
url: env('DATABASE_URL'),
|
url: process.env.DATABASE_URL,
|
||||||
},
|
},
|
||||||
})
|
}
|
||||||
@@ -0,0 +1,66 @@
|
|||||||
|
import { PrismaClient } from './generated/prisma/client';
|
||||||
|
|
||||||
|
const prisma = new PrismaClient();
|
||||||
|
|
||||||
|
async function main() {
|
||||||
|
try {
|
||||||
|
// Create the admin role if it doesn't exist
|
||||||
|
const adminRole = await prisma.role.upsert({
|
||||||
|
where: { moniker: 'admin' },
|
||||||
|
update: {},
|
||||||
|
create: {
|
||||||
|
title: 'Administrator',
|
||||||
|
moniker: 'admin',
|
||||||
|
permissions: JSON.stringify({
|
||||||
|
// Full permissions for admin
|
||||||
|
'*': true,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log('✓ Admin role created/verified:', adminRole);
|
||||||
|
|
||||||
|
// Find the user with jackson.roberts@totaltech.net
|
||||||
|
const user = await prisma.user.findUnique({
|
||||||
|
where: { email: 'jackson.roberts@totaltech.net' },
|
||||||
|
include: { roles: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!user) {
|
||||||
|
console.error(
|
||||||
|
'✗ User jackson.roberts@totaltech.net not found. Please ensure the user exists.',
|
||||||
|
);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('✓ User found:', user.email);
|
||||||
|
|
||||||
|
// Check if user already has admin role
|
||||||
|
const hasAdminRole = user.roles.some((r) => r.moniker === 'admin');
|
||||||
|
|
||||||
|
if (hasAdminRole) {
|
||||||
|
console.log('✓ User already has admin role');
|
||||||
|
} else {
|
||||||
|
// Assign admin role to user
|
||||||
|
const updatedUser = await prisma.user.update({
|
||||||
|
where: { id: user.id },
|
||||||
|
data: {
|
||||||
|
roles: {
|
||||||
|
connect: { id: adminRole.id },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
include: { roles: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log('✓ Admin role assigned to jackson.roberts@totaltech.net');
|
||||||
|
console.log('✓ User roles:', updatedUser.roles.map((r) => r.moniker));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('✗ Error:', error);
|
||||||
|
process.exit(1);
|
||||||
|
} finally {
|
||||||
|
await prisma.$disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
main();
|
||||||
@@ -2,7 +2,7 @@ import { createRoute } from "../../modules/api-utils/createRoute";
|
|||||||
import { apiResponse } from "../../modules/api-utils/apiResponse";
|
import { apiResponse } from "../../modules/api-utils/apiResponse";
|
||||||
import { ContentfulStatusCode } from "hono/utils/http-status";
|
import { ContentfulStatusCode } from "hono/utils/http-status";
|
||||||
import { authMiddleware } from "../middleware/authorization";
|
import { authMiddleware } from "../middleware/authorization";
|
||||||
import { getBoss } from "../../workert";
|
import { getBoss } from "../../boss-instance";
|
||||||
import { WorkerQueue } from "../../modules/workers/queues";
|
import { WorkerQueue } from "../../modules/workers/queues";
|
||||||
|
|
||||||
/* POST /v1/cw/sync/full */
|
/* POST /v1/cw/sync/full */
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
/**
|
||||||
|
* Shared PgBoss singleton — kept in its own module to break circular imports
|
||||||
|
* between workert.ts and the worker modules that call getBoss().
|
||||||
|
*/
|
||||||
|
import { PgBoss } from "pg-boss";
|
||||||
|
|
||||||
|
function makePgBossUrl(rawUrl: string): string {
|
||||||
|
try {
|
||||||
|
const u = new URL(rawUrl);
|
||||||
|
// 30-second statement timeout to prevent individual SQL queries from
|
||||||
|
// hanging indefinitely if the DB server stops responding mid-query.
|
||||||
|
u.searchParams.set("options", "-c statement_timeout=30000");
|
||||||
|
return u.toString();
|
||||||
|
} catch {
|
||||||
|
return rawUrl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const boss = new PgBoss({
|
||||||
|
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
|
||||||
|
connectionTimeoutMillis: 15_000,
|
||||||
|
});
|
||||||
|
|
||||||
|
boss.on("error", (err) => {
|
||||||
|
console.error("[worker] PgBoss error", err);
|
||||||
|
});
|
||||||
|
|
||||||
|
export function getBoss(): PgBoss {
|
||||||
|
return boss;
|
||||||
|
}
|
||||||
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function formatOpportunityContactName(
|
||||||
|
firstName?: string | null,
|
||||||
|
lastName?: string | null
|
||||||
|
): string {
|
||||||
|
const first = (firstName ?? "").trim();
|
||||||
|
const last = (lastName ?? "").trim();
|
||||||
|
|
||||||
|
if (first && last.toLowerCase() === "contact") {
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
|
||||||
|
return `${first} ${last}`.trim();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opportunity Controller
|
* Opportunity Controller
|
||||||
*
|
*
|
||||||
@@ -290,7 +304,7 @@ export class OpportunityController {
|
|||||||
| null
|
| null
|
||||||
| undefined;
|
| undefined;
|
||||||
this.contactName = (data as any).contactName ?? (contactRel
|
this.contactName = (data as any).contactName ?? (contactRel
|
||||||
? `${contactRel.firstName} ${contactRel.lastName}`.trim()
|
? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
|
||||||
: null);
|
: null);
|
||||||
|
|
||||||
// Site
|
// Site
|
||||||
@@ -674,7 +688,7 @@ export class OpportunityController {
|
|||||||
id: contact.id,
|
id: contact.id,
|
||||||
contact: {
|
contact: {
|
||||||
id: contact.id,
|
id: contact.id,
|
||||||
name: `${contact.firstName} ${contact.lastName}`.trim(),
|
name: formatOpportunityContactName(contact.firstName, contact.lastName),
|
||||||
},
|
},
|
||||||
company: contact.company
|
company: contact.company
|
||||||
? {
|
? {
|
||||||
|
|||||||
+2
-1
@@ -6,7 +6,8 @@ import { events } from "./modules/globalEvents";
|
|||||||
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
||||||
import { signPermissions } from "./modules/permission-utils/signPermissions";
|
import { signPermissions } from "./modules/permission-utils/signPermissions";
|
||||||
import { RoleController } from "./controllers/RoleController";
|
import { RoleController } from "./controllers/RoleController";
|
||||||
import { initializeWorkerSystem, getBoss } from "./workert";
|
import { initializeWorkerSystem } from "./workert";
|
||||||
|
import { getBoss } from "./boss-instance";
|
||||||
import { WorkerQueue } from "./modules/workers/queues";
|
import { WorkerQueue } from "./modules/workers/queues";
|
||||||
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
|
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
|
||||||
import { startCommsServer } from "./modules/workers/coms";
|
import { startCommsServer } from "./modules/workers/coms";
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import PdfPrinter from "pdfmake/src/Printer";
|
import PdfPrinter from "pdfmake/src/Printer";
|
||||||
import { readFileSync } from "node:fs";
|
import { existsSync, readFileSync } from "node:fs";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
|
|
||||||
export interface QuoteLineItem {
|
export interface QuoteLineItem {
|
||||||
@@ -110,7 +110,26 @@ const COMPANY = {
|
|||||||
|
|
||||||
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
||||||
|
|
||||||
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto");
|
function resolveRobotoFontDir(): string {
|
||||||
|
const candidates = [
|
||||||
|
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (const dir of candidates) {
|
||||||
|
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
|
||||||
|
return dir;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(
|
||||||
|
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fontDir = resolveRobotoFontDir();
|
||||||
const fonts = {
|
const fonts = {
|
||||||
Roboto: {
|
Roboto: {
|
||||||
normal: join(fontDir, "Roboto-Regular.ttf"),
|
normal: join(fontDir, "Roboto-Regular.ttf"),
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { Server } from "socket.io";
|
import { Server } from "socket.io";
|
||||||
import { events, EventTypes } from "../globalEvents";
|
import { events, EventTypes } from "../globalEvents";
|
||||||
import { WorkerQueue } from "./queues";
|
import { WorkerQueue } from "./queues";
|
||||||
|
import { reserveWorkerId } from "../../workert";
|
||||||
|
|
||||||
function emitGlobalEvent<K extends keyof EventTypes>(
|
function emitGlobalEvent<K extends keyof EventTypes>(
|
||||||
name: K,
|
name: K,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { Socket } from "socket.io-client";
|
import { Socket } from "socket.io-client";
|
||||||
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
||||||
|
import { prisma } from "../../constants";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
||||||
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
|
|||||||
* Called every 5 seconds via PgBoss from the API process interval.
|
* Called every 5 seconds via PgBoss from the API process interval.
|
||||||
*/
|
*/
|
||||||
export async function executeIncrementalSync(): Promise<void> {
|
export async function executeIncrementalSync(): Promise<void> {
|
||||||
return executeForcedIncrementalDalpuriSync();
|
let jobRunId: string | undefined;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const run = await prisma.syncJobRun.create({
|
||||||
|
data: {
|
||||||
|
jobType: "INCREMENTAL_SYNC",
|
||||||
|
status: "RUNNING",
|
||||||
|
triggeredBy: "worker",
|
||||||
|
startedAt: new Date(),
|
||||||
|
},
|
||||||
|
select: { id: true },
|
||||||
|
});
|
||||||
|
jobRunId = run.id;
|
||||||
|
} catch (err) {
|
||||||
|
// Sync should still run even if tracking insert fails.
|
||||||
|
console.error("[sync] Failed to create incremental SyncJobRun", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await executeForcedIncrementalDalpuriSync({ jobRunId });
|
||||||
|
|
||||||
|
if (jobRunId) {
|
||||||
|
await prisma.syncJobRun.update({
|
||||||
|
where: { id: jobRunId },
|
||||||
|
data: {
|
||||||
|
status: "COMPLETED",
|
||||||
|
completedAt: new Date(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
if (jobRunId) {
|
||||||
|
const errorSummary = err instanceof Error ? err.message : String(err);
|
||||||
|
await prisma.syncJobRun
|
||||||
|
.update({
|
||||||
|
where: { id: jobRunId },
|
||||||
|
data: {
|
||||||
|
status: "FAILED",
|
||||||
|
completedAt: new Date(),
|
||||||
|
errorSummary: errorSummary.slice(0, 2000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.catch(() => {
|
||||||
|
// Best-effort update only.
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { getBoss } from "../../workert";
|
import { getBoss } from "../../boss-instance";
|
||||||
import { WorkerQueue } from "./queues";
|
import { WorkerQueue } from "./queues";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
|
|||||||
* Called on an interval from the main API process so it survives worker restarts.
|
* Called on an interval from the main API process so it survives worker restarts.
|
||||||
*/
|
*/
|
||||||
export async function enqueueIncrementalSync(): Promise<void> {
|
export async function enqueueIncrementalSync(): Promise<void> {
|
||||||
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
|
const jobId = await getBoss().send(
|
||||||
|
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
|
||||||
|
{
|
||||||
|
enqueuedAt: new Date().toISOString(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
singletonKey: "dalpuri-incremental-sync",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!jobId) {
|
||||||
|
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
|
|||||||
queueType: WorkerQueue,
|
queueType: WorkerQueue,
|
||||||
workFn: (workerSocket: Socket) => Promise<T>,
|
workFn: (workerSocket: Socket) => Promise<T>,
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
|
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
// Request a worker ID and namespace from the manager
|
// Request a worker ID and namespace from the manager
|
||||||
socket.emit(
|
socket.emit(
|
||||||
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Connect to the worker-specific namespace
|
// Connect to the worker-specific namespace
|
||||||
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
|
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
|
||||||
reconnection: false,
|
reconnection: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
+61
-23
@@ -1,13 +1,7 @@
|
|||||||
import { PgBoss } from "pg-boss";
|
|
||||||
import { io, Socket } from "socket.io-client";
|
import { io, Socket } from "socket.io-client";
|
||||||
import { WorkerQueue } from "./modules/workers/queues";
|
import { WorkerQueue } from "./modules/workers/queues";
|
||||||
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
||||||
|
import { boss, getBoss } from "./boss-instance";
|
||||||
const boss = new PgBoss(process.env.DATABASE_URL!);
|
|
||||||
|
|
||||||
boss.on("error", (err) => {
|
|
||||||
console.error("[worker] PgBoss error", err);
|
|
||||||
});
|
|
||||||
|
|
||||||
let bossStartPromise: Promise<void> | null = null;
|
let bossStartPromise: Promise<void> | null = null;
|
||||||
let reservationQueueReady = false;
|
let reservationQueueReady = false;
|
||||||
@@ -111,19 +105,25 @@ export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
|
|||||||
|
|
||||||
async function ensureDalpuriSyncQueue(): Promise<void> {
|
async function ensureDalpuriSyncQueue(): Promise<void> {
|
||||||
try {
|
try {
|
||||||
|
console.log("[worker] Creating DALPURI_FULL_SYNC queue...");
|
||||||
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
|
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
|
||||||
} catch {
|
console.log("[worker] DALPURI_FULL_SYNC queue ready");
|
||||||
// Queue may already exist; ignore to keep this idempotent.
|
} catch (err) {
|
||||||
|
console.log("[worker] DALPURI_FULL_SYNC queue already exists (or error):", (err as Error).message);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
console.log("[worker] Creating DALPURI_INCREMENTAL_SYNC queue...");
|
||||||
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
|
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
|
||||||
} catch {
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC queue ready");
|
||||||
// Queue may already exist; ignore to keep this idempotent.
|
} catch (err) {
|
||||||
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC queue already exists (or error):", (err as Error).message);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
console.log("[worker] Creating REFRESH_SALES_METRICS queue...");
|
||||||
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
|
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
|
||||||
} catch {
|
console.log("[worker] REFRESH_SALES_METRICS queue ready");
|
||||||
// Queue may already exist; ignore to keep this idempotent.
|
} catch (err) {
|
||||||
|
console.log("[worker] REFRESH_SALES_METRICS queue already exists (or error):", (err as Error).message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,14 +138,6 @@ export async function initializeWorkerSystem(): Promise<void> {
|
|||||||
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the PgBoss instance for direct job enqueueing.
|
|
||||||
* Must call initializeWorkerSystem() first.
|
|
||||||
*/
|
|
||||||
export function getBoss(): PgBoss {
|
|
||||||
return boss;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (import.meta.main) {
|
if (import.meta.main) {
|
||||||
// if (Bun.env.NODE_ENV === "development") {
|
// if (Bun.env.NODE_ENV === "development") {
|
||||||
// setupEventDebugger({ processLabel: "WORKER" });
|
// setupEventDebugger({ processLabel: "WORKER" });
|
||||||
@@ -155,14 +147,33 @@ if (import.meta.main) {
|
|||||||
console.log(
|
console.log(
|
||||||
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
|
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
|
||||||
);
|
);
|
||||||
|
console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`);
|
||||||
|
|
||||||
// Ensure PgBoss is connected and queues exist
|
// Ensure PgBoss is connected and queues exist
|
||||||
await ensureBossStarted();
|
console.log("[worker] Starting PgBoss...");
|
||||||
|
try {
|
||||||
|
await Promise.race([
|
||||||
|
ensureBossStarted(),
|
||||||
|
new Promise<never>((_, reject) =>
|
||||||
|
setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000)
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[worker] FATAL: PgBoss failed to start:", err);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
console.log("[worker] PgBoss started successfully");
|
||||||
|
console.log("[worker] Ensuring sync queues...");
|
||||||
await ensureDalpuriSyncQueue();
|
await ensureDalpuriSyncQueue();
|
||||||
|
console.log("[worker] Sync queues ready");
|
||||||
|
|
||||||
// Register job handler for DALPURI_FULL_SYNC
|
// Register job handler for DALPURI_FULL_SYNC
|
||||||
|
console.log("[worker] Importing sync-manager...");
|
||||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||||
|
console.log("[worker] Importing dalpuri-sync...");
|
||||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||||
|
console.log("[worker] Importing incremental-sync...");
|
||||||
|
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
||||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||||
const socket = await ensureManagerSocketReady();
|
const socket = await ensureManagerSocketReady();
|
||||||
await enqueueDalpuriFullSync(socket);
|
await enqueueDalpuriFullSync(socket);
|
||||||
@@ -170,10 +181,37 @@ if (import.meta.main) {
|
|||||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||||
|
|
||||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||||
await executeIncrementalSync();
|
const startedAt = Date.now();
|
||||||
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
||||||
|
try {
|
||||||
|
await executeIncrementalSync();
|
||||||
|
console.log(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||||
|
|
||||||
|
const enqueueIncrementalWithLogging = () => {
|
||||||
|
enqueueIncrementalSync().catch((err) => {
|
||||||
|
console.error(
|
||||||
|
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
||||||
|
);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
||||||
|
// API interval scheduling is unavailable.
|
||||||
|
enqueueIncrementalWithLogging();
|
||||||
|
setInterval(enqueueIncrementalWithLogging, 5_000);
|
||||||
|
console.log("[worker] Started 5-second incremental enqueue interval");
|
||||||
|
|
||||||
// Register job handler for REFRESH_SALES_METRICS
|
// Register job handler for REFRESH_SALES_METRICS
|
||||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||||
|
|||||||
@@ -621,7 +621,7 @@
|
|||||||
|
|
||||||
"@types/aria-query": ["@types/aria-query@5.0.4", "", {}, "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw=="],
|
"@types/aria-query": ["@types/aria-query@5.0.4", "", {}, "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw=="],
|
||||||
|
|
||||||
"@types/bun": ["@types/bun@1.3.11", "", { "dependencies": { "bun-types": "1.3.11" } }, "sha512-5vPne5QvtpjGpsGYXiFyycfpDF2ECyPcTSsFBMa0fraoxiQyMJ3SmuQIGhzPg2WJuWxVBoxWJ2kClYTcw/4fAg=="],
|
"@types/bun": ["@types/bun@1.3.12", "", { "dependencies": { "bun-types": "1.3.12" } }, "sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A=="],
|
||||||
|
|
||||||
"@types/cacheable-request": ["@types/cacheable-request@6.0.3", "", { "dependencies": { "@types/http-cache-semantics": "*", "@types/keyv": "^3.1.4", "@types/node": "*", "@types/responselike": "^1.0.0" } }, "sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw=="],
|
"@types/cacheable-request": ["@types/cacheable-request@6.0.3", "", { "dependencies": { "@types/http-cache-semantics": "*", "@types/keyv": "^3.1.4", "@types/node": "*", "@types/responselike": "^1.0.0" } }, "sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw=="],
|
||||||
|
|
||||||
@@ -841,7 +841,7 @@
|
|||||||
|
|
||||||
"buffer-from": ["buffer-from@1.1.2", "", {}, "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="],
|
"buffer-from": ["buffer-from@1.1.2", "", {}, "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="],
|
||||||
|
|
||||||
"bun-types": ["bun-types@1.3.11", "", { "dependencies": { "@types/node": "*" } }, "sha512-1KGPpoxQWl9f6wcZh57LvrPIInQMn2TQ7jsgxqpRzg+l0QPOFvJVH7HmvHo/AiPgwXy+/Thf6Ov3EdVn1vOabg=="],
|
"bun-types": ["bun-types@1.3.12", "", { "dependencies": { "@types/node": "*" } }, "sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA=="],
|
||||||
|
|
||||||
"bundle-name": ["bundle-name@4.1.0", "", { "dependencies": { "run-applescript": "^7.0.0" } }, "sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q=="],
|
"bundle-name": ["bundle-name@4.1.0", "", { "dependencies": { "run-applescript": "^7.0.0" } }, "sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q=="],
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ metadata:
|
|||||||
spec:
|
spec:
|
||||||
backoffLimit: 0
|
backoffLimit: 0
|
||||||
ttlSecondsAfterFinished: 86400
|
ttlSecondsAfterFinished: 86400
|
||||||
activeDeadlineSeconds: 1800
|
activeDeadlineSeconds: 7200
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
|
|||||||
@@ -294,6 +294,22 @@ const refreshContextFromApi = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const cwMembers = await apiPrisma.cwMember.findMany({
|
||||||
|
select: { cwMemberId: true, identifier: true },
|
||||||
|
});
|
||||||
|
for (const member of cwMembers) {
|
||||||
|
if (
|
||||||
|
member.cwMemberId != null &&
|
||||||
|
member.identifier &&
|
||||||
|
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
|
||||||
|
) {
|
||||||
|
context.userIdentifiersByMemberRecId.set(
|
||||||
|
member.cwMemberId,
|
||||||
|
member.identifier
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (const board of boards) {
|
for (const board of boards) {
|
||||||
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
||||||
}
|
}
|
||||||
@@ -426,6 +442,12 @@ const sanitizeModelData = (
|
|||||||
) {
|
) {
|
||||||
sanitized.statusId = null;
|
sanitized.statusId = null;
|
||||||
}
|
}
|
||||||
|
if (
|
||||||
|
sanitized.locationId != null &&
|
||||||
|
!context.corporateLocationIds.has(sanitized.locationId as number)
|
||||||
|
) {
|
||||||
|
sanitized.locationId = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetModel === "schedule") {
|
if (targetModel === "schedule") {
|
||||||
@@ -734,6 +756,11 @@ const getConfigForTable = (table: string): SyncTableConfig | null => {
|
|||||||
secondarySalesFlag: true,
|
secondarySalesFlag: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
soOppStatus: {
|
||||||
|
select: {
|
||||||
|
closedFlag: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
+216
-19
@@ -75,6 +75,136 @@ type DeleteResult = {
|
|||||||
|
|
||||||
let incrementalDeleteStepIndex = 0;
|
let incrementalDeleteStepIndex = 0;
|
||||||
|
|
||||||
|
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||||
|
"Companies",
|
||||||
|
"Company Addresses",
|
||||||
|
"Contacts",
|
||||||
|
"Opportunities",
|
||||||
|
]);
|
||||||
|
|
||||||
|
const CRITICAL_CW_WATERMARK_TABLES = new Set([
|
||||||
|
"Companies",
|
||||||
|
"Company Addresses",
|
||||||
|
"Contacts",
|
||||||
|
"Opportunities",
|
||||||
|
]);
|
||||||
|
|
||||||
|
const criticalFullSyncIntervalMinutes = Math.max(
|
||||||
|
1,
|
||||||
|
Number.parseInt(
|
||||||
|
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
|
||||||
|
10
|
||||||
|
) || 15
|
||||||
|
);
|
||||||
|
|
||||||
|
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||||
|
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||||
|
|
||||||
|
const criticalCwWatermarkOverlapSeconds = Math.max(
|
||||||
|
5,
|
||||||
|
Number.parseInt(
|
||||||
|
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
|
||||||
|
10
|
||||||
|
) || 60
|
||||||
|
);
|
||||||
|
|
||||||
|
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
|
||||||
|
criticalCwWatermarkOverlapSeconds * 1000;
|
||||||
|
|
||||||
|
const criticalCwDeltaLimit = Math.max(
|
||||||
|
100,
|
||||||
|
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
|
||||||
|
5000
|
||||||
|
);
|
||||||
|
|
||||||
|
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||||
|
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
|
||||||
|
|
||||||
|
const shouldForceCriticalFullSync = (
|
||||||
|
step: Step,
|
||||||
|
forceIncremental: boolean
|
||||||
|
): boolean => {
|
||||||
|
if (!forceIncremental) return false;
|
||||||
|
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
|
||||||
|
|
||||||
|
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastCriticalFullSyncByStep.set(step.name, now);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
const computeCriticalCwWatermarkDecision = async (
|
||||||
|
cwPrisma: CwPrismaClient,
|
||||||
|
step: Step,
|
||||||
|
forceIncremental: boolean
|
||||||
|
): Promise<SmartSyncDecision | null> => {
|
||||||
|
if (!forceIncremental) return null;
|
||||||
|
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
|
||||||
|
|
||||||
|
const cwDelegate = (
|
||||||
|
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
||||||
|
)[step.sourceModel];
|
||||||
|
|
||||||
|
if (!cwDelegate) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const existingWhere =
|
||||||
|
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
||||||
|
|
||||||
|
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
|
||||||
|
const lowerBound = lastWatermark
|
||||||
|
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
|
||||||
|
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
|
||||||
|
|
||||||
|
const rows = (await cwDelegate.findMany({
|
||||||
|
select: {
|
||||||
|
[step.sourceIdField]: true,
|
||||||
|
[step.sourceUpdatedField]: true,
|
||||||
|
},
|
||||||
|
where: {
|
||||||
|
...(existingWhere as Record<string, unknown>),
|
||||||
|
[step.sourceUpdatedField]: {
|
||||||
|
gte: lowerBound,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
orderBy: { [step.sourceUpdatedField]: "asc" },
|
||||||
|
take: criticalCwDeltaLimit,
|
||||||
|
})) as Row[];
|
||||||
|
|
||||||
|
if (rows.length >= criticalCwDeltaLimit) {
|
||||||
|
console.warn(
|
||||||
|
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
|
||||||
|
);
|
||||||
|
return { mode: "full", differences: [] };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows.length > 0) {
|
||||||
|
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
|
||||||
|
if (latest) {
|
||||||
|
lastCriticalCwWatermarkByStep.set(step.name, latest);
|
||||||
|
}
|
||||||
|
} else if (!lastWatermark) {
|
||||||
|
lastCriticalCwWatermarkByStep.set(step.name, new Date());
|
||||||
|
}
|
||||||
|
|
||||||
|
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
|
||||||
|
console.log(
|
||||||
|
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
mode: "incremental",
|
||||||
|
sourceIds,
|
||||||
|
differences: [],
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
const parseEnvFile = (path: string): Record<string, string> => {
|
const parseEnvFile = (path: string): Record<string, string> => {
|
||||||
const envData = readFileSync(path, "utf8");
|
const envData = readFileSync(path, "utf8");
|
||||||
const out: Record<string, string> = {};
|
const out: Record<string, string> = {};
|
||||||
@@ -107,6 +237,20 @@ const resolveApiDatabaseUrl = (): string => {
|
|||||||
if (process.env.OPTIMA_API_DATABASE_URL)
|
if (process.env.OPTIMA_API_DATABASE_URL)
|
||||||
return process.env.OPTIMA_API_DATABASE_URL;
|
return process.env.OPTIMA_API_DATABASE_URL;
|
||||||
|
|
||||||
|
// Worker/runtime fallback:
|
||||||
|
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
|
||||||
|
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
|
||||||
|
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
|
||||||
|
return process.env.DATABASE_URL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
process.env.DATABASE_URL &&
|
||||||
|
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
|
||||||
|
) {
|
||||||
|
return process.env.DATABASE_URL;
|
||||||
|
}
|
||||||
|
|
||||||
const candidates = [
|
const candidates = [
|
||||||
resolve(import.meta.dir, "../../api/.env"),
|
resolve(import.meta.dir, "../../api/.env"),
|
||||||
resolve(process.cwd(), "../api/.env"),
|
resolve(process.cwd(), "../api/.env"),
|
||||||
@@ -323,6 +467,22 @@ const refreshContextFromApi = async (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const cwMembers = await apiPrisma.cwMember.findMany({
|
||||||
|
select: { cwMemberId: true, identifier: true },
|
||||||
|
});
|
||||||
|
for (const member of cwMembers) {
|
||||||
|
if (
|
||||||
|
member.cwMemberId != null &&
|
||||||
|
member.identifier &&
|
||||||
|
!context.userIdentifiersByMemberRecId.has(member.cwMemberId)
|
||||||
|
) {
|
||||||
|
context.userIdentifiersByMemberRecId.set(
|
||||||
|
member.cwMemberId,
|
||||||
|
member.identifier
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (const board of boards) {
|
for (const board of boards) {
|
||||||
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
context.serviceTicketBoardUidsById.set(board.id, board.uid);
|
||||||
}
|
}
|
||||||
@@ -636,6 +796,13 @@ const sanitizeModelData = (
|
|||||||
) {
|
) {
|
||||||
sanitized.stageId = null;
|
sanitized.stageId = null;
|
||||||
}
|
}
|
||||||
|
// Nullify locationId if the corporate location doesn't exist
|
||||||
|
if (
|
||||||
|
sanitized.locationId != null &&
|
||||||
|
!context.corporateLocationIds.has(sanitized.locationId as number)
|
||||||
|
) {
|
||||||
|
sanitized.locationId = null;
|
||||||
|
}
|
||||||
// Nullify taxCodeId if the tax code hasn't synced yet
|
// Nullify taxCodeId if the tax code hasn't synced yet
|
||||||
if (
|
if (
|
||||||
sanitized.taxCodeId != null &&
|
sanitized.taxCodeId != null &&
|
||||||
@@ -1328,6 +1495,15 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
||||||
|
|
||||||
const steps: Step[] = [
|
const steps: Step[] = [
|
||||||
|
{
|
||||||
|
name: "CW Members",
|
||||||
|
sourceModel: "member",
|
||||||
|
targetModel: "cwMember",
|
||||||
|
translation: cwMemberTranslation as unknown as AnyTranslation,
|
||||||
|
uniqueField: "cwMemberId",
|
||||||
|
sourceIdField: "memberRecId",
|
||||||
|
sourceUpdatedField: "lastUpdatedUtc",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Users",
|
name: "Users",
|
||||||
sourceModel: "member",
|
sourceModel: "member",
|
||||||
@@ -1342,15 +1518,6 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "CW Members",
|
|
||||||
sourceModel: "member",
|
|
||||||
targetModel: "cwMember",
|
|
||||||
translation: cwMemberTranslation as unknown as AnyTranslation,
|
|
||||||
uniqueField: "cwMemberId",
|
|
||||||
sourceIdField: "memberRecId",
|
|
||||||
sourceUpdatedField: "lastUpdatedUtc",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "Companies",
|
name: "Companies",
|
||||||
sourceModel: "company",
|
sourceModel: "company",
|
||||||
@@ -1585,6 +1752,11 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
secondarySalesFlag: true,
|
secondarySalesFlag: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
soOppStatus: {
|
||||||
|
select: {
|
||||||
|
closedFlag: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -1729,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
step,
|
step,
|
||||||
forceIncremental
|
forceIncremental
|
||||||
);
|
);
|
||||||
|
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
|
||||||
|
cwPrisma,
|
||||||
|
step,
|
||||||
|
forceIncremental
|
||||||
|
);
|
||||||
|
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||||
|
step,
|
||||||
|
forceIncremental
|
||||||
|
);
|
||||||
|
const effectiveDecision = forceCriticalFullSync
|
||||||
|
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||||
|
: criticalWatermarkDecision ?? decision;
|
||||||
|
|
||||||
|
if (forceCriticalFullSync) {
|
||||||
|
console.log(
|
||||||
|
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const sourceIdsFilter =
|
const sourceIdsFilter =
|
||||||
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
effectiveDecision.mode === "incremental"
|
||||||
|
? effectiveDecision.sourceIds
|
||||||
|
: undefined;
|
||||||
console.log(
|
console.log(
|
||||||
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
||||||
decision.mode
|
effectiveDecision.mode
|
||||||
}${
|
}${
|
||||||
decision.mode === "incremental"
|
effectiveDecision.mode === "incremental"
|
||||||
? ` (${decision.sourceIds.length} ids)`
|
? ` (${effectiveDecision.sourceIds.length} ids)`
|
||||||
: ""
|
: ""
|
||||||
}`
|
}`
|
||||||
);
|
);
|
||||||
if (logAllDifferences) {
|
if (logAllDifferences) {
|
||||||
logAllSmartSyncDifferences(step, decision.differences);
|
logAllSmartSyncDifferences(step, effectiveDecision.differences);
|
||||||
}
|
}
|
||||||
const result = await syncStep(
|
const result = await syncStep(
|
||||||
cwPrisma,
|
cwPrisma,
|
||||||
@@ -1763,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
|
|
||||||
await writeStepLog(
|
await writeStepLog(
|
||||||
step.name,
|
step.name,
|
||||||
decision.mode,
|
effectiveDecision.mode,
|
||||||
result,
|
result,
|
||||||
{ deleted: 0, failed: 0 },
|
{ deleted: 0, failed: 0 },
|
||||||
Date.now() - stepStart
|
Date.now() - stepStart
|
||||||
@@ -1860,8 +2053,12 @@ export const executeForcedIncrementalDalpuriSync = async (options?: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if (import.meta.main) {
|
if (import.meta.main) {
|
||||||
executeFullDalpuriSync().catch((error) => {
|
executeFullDalpuriSync()
|
||||||
console.error("CW -> API sync failed:", error);
|
.then(() => {
|
||||||
process.exit(1);
|
process.exit(0);
|
||||||
});
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
console.error("CW -> API sync failed:", error);
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
|
|||||||
{
|
{
|
||||||
from: "lastName",
|
from: "lastName",
|
||||||
to: "lastName",
|
to: "lastName",
|
||||||
process: (value) => (value ? value : "Contact"),
|
process: (value) => (value ? value : ""),
|
||||||
},
|
},
|
||||||
{ from: "nickName", to: "nickname" },
|
{ from: "nickName", to: "nickname" },
|
||||||
{ from: "title", to: "title" },
|
{ from: "title", to: "title" },
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import {
|
import {
|
||||||
Opportunity as CwOpportunity,
|
Opportunity as CwOpportunity,
|
||||||
OpportunityMember as CwOpportunityMember,
|
OpportunityMember as CwOpportunityMember,
|
||||||
|
SoOppStatus as CwSoOppStatus,
|
||||||
} from "../../generated/prisma/client";
|
} from "../../generated/prisma/client";
|
||||||
import { OpportunityInterest } from "../../../api/generated/prisma/client";
|
import { OpportunityInterest } from "../../../api/generated/prisma/client";
|
||||||
import { Translation, skipRow } from "./types";
|
import { Translation, skipRow } from "./types";
|
||||||
@@ -30,6 +31,7 @@ type ApiOpportunityRecord = {
|
|||||||
dateBecameLead?: Date | null;
|
dateBecameLead?: Date | null;
|
||||||
closedDate?: Date | null;
|
closedDate?: Date | null;
|
||||||
closedFlag: boolean;
|
closedFlag: boolean;
|
||||||
|
locationId?: number | null;
|
||||||
closedById?: string | null;
|
closedById?: string | null;
|
||||||
updatedBy: string;
|
updatedBy: string;
|
||||||
eneteredBy: string;
|
eneteredBy: string;
|
||||||
@@ -42,6 +44,7 @@ type CwOpportunityWithMembers = CwOpportunity & {
|
|||||||
CwOpportunityMember,
|
CwOpportunityMember,
|
||||||
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
|
"memberRecId" | "primarySalesFlag" | "secondarySalesFlag"
|
||||||
>[];
|
>[];
|
||||||
|
soOppStatus?: Pick<CwSoOppStatus, "closedFlag"> | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
const toInterest = (value: number | null): OpportunityInterest | null => {
|
const toInterest = (value: number | null): OpportunityInterest | null => {
|
||||||
@@ -119,13 +122,19 @@ export const opportunityTranslation: Translation<
|
|||||||
},
|
},
|
||||||
{ from: "companyRecId", to: "companyId" },
|
{ from: "companyRecId", to: "companyId" },
|
||||||
{ from: "contactRecId", to: "contactId" },
|
{ from: "contactRecId", to: "contactId" },
|
||||||
|
{ from: "ownerLevelRecId", to: "locationId" },
|
||||||
{ from: "companyAddressRecId", to: "siteId" },
|
{ from: "companyAddressRecId", to: "siteId" },
|
||||||
{ from: "poNumber", to: "customerPO" },
|
{ from: "poNumber", to: "customerPO" },
|
||||||
{ from: "dateCloseExpected", to: "expectedCloseDate" },
|
{ from: "dateCloseExpected", to: "expectedCloseDate" },
|
||||||
{ from: "datePipelineChange", to: "pipelineChangeDate" },
|
{ from: "datePipelineChange", to: "pipelineChangeDate" },
|
||||||
{ from: "dateBecameLead", to: "dateBecameLead" },
|
{ from: "dateBecameLead", to: "dateBecameLead" },
|
||||||
{ from: "dateClosed", to: "closedDate" },
|
{ from: "dateClosed", to: "closedDate" },
|
||||||
{ from: "oldCloseFlag", to: "closedFlag" },
|
{
|
||||||
|
from: "oldCloseFlag",
|
||||||
|
to: "closedFlag",
|
||||||
|
process: (_value, _context, row) =>
|
||||||
|
row.soOppStatus?.closedFlag ?? row.oldCloseFlag ?? false,
|
||||||
|
},
|
||||||
{ from: "closedBy", to: "closedById" },
|
{ from: "closedBy", to: "closedById" },
|
||||||
{
|
{
|
||||||
from: "updatedBy",
|
from: "updatedBy",
|
||||||
|
|||||||
Reference in New Issue
Block a user