Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 86d7426e8b | |||
| afe56393e7 | |||
| b2cd26af30 | |||
| 0594816ea4 | |||
| 71fe36c0b8 | |||
| e0d575454e |
@@ -231,9 +231,10 @@ jobs:
|
|||||||
run: bun install --frozen-lockfile
|
run: bun install --frozen-lockfile
|
||||||
|
|
||||||
- name: Rebuild native modules
|
- name: Rebuild native modules
|
||||||
run: npm rebuild
|
run: npm rebuild --ignore-scripts
|
||||||
env:
|
env:
|
||||||
HUSKY: "0"
|
HUSKY: "0"
|
||||||
|
HUSKY_SKIP_INSTALL: "1"
|
||||||
|
|
||||||
- name: Build macOS distributables
|
- name: Build macOS distributables
|
||||||
run: bun run make:macos
|
run: bun run make:macos
|
||||||
@@ -272,9 +273,10 @@ jobs:
|
|||||||
run: bun install --frozen-lockfile
|
run: bun install --frozen-lockfile
|
||||||
|
|
||||||
- name: Rebuild native modules
|
- name: Rebuild native modules
|
||||||
run: npm rebuild
|
run: npm rebuild --ignore-scripts
|
||||||
env:
|
env:
|
||||||
HUSKY: "0"
|
HUSKY: "0"
|
||||||
|
HUSKY_SKIP_INSTALL: "1"
|
||||||
|
|
||||||
- name: Build Windows distributables
|
- name: Build Windows distributables
|
||||||
run: bun run make -- --platform win32
|
run: bun run make -- --platform win32
|
||||||
|
|||||||
@@ -90,6 +90,9 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
|
|||||||
# Copy production node_modules (Prisma adapter needs native bindings)
|
# Copy production node_modules (Prisma adapter needs native bindings)
|
||||||
COPY --from=deps /app/node_modules/ ./node_modules/
|
COPY --from=deps /app/node_modules/ ./node_modules/
|
||||||
|
|
||||||
|
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
|
||||||
|
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
|
||||||
|
|
||||||
ENV NODE_ENV=production
|
ENV NODE_ENV=production
|
||||||
|
|
||||||
# ---- Stage 4: API server runtime image ----
|
# ---- Stage 4: API server runtime image ----
|
||||||
|
|||||||
@@ -20,6 +20,11 @@ spec:
|
|||||||
env:
|
env:
|
||||||
- name: MANAGER_SOCKET_URL
|
- name: MANAGER_SOCKET_URL
|
||||||
value: "http://optima-api.optima.svc.cluster.local:8671"
|
value: "http://optima-api.optima.svc.cluster.local:8671"
|
||||||
|
- name: API_DATABASE_URL
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: api-env-secret
|
||||||
|
key: DATABASE_URL
|
||||||
envFrom:
|
envFrom:
|
||||||
- secretRef:
|
- secretRef:
|
||||||
name: api-env-secret
|
name: api-env-secret
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import PdfPrinter from "pdfmake/src/Printer";
|
import PdfPrinter from "pdfmake/src/Printer";
|
||||||
import { readFileSync } from "node:fs";
|
import { existsSync, readFileSync } from "node:fs";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
|
|
||||||
export interface QuoteLineItem {
|
export interface QuoteLineItem {
|
||||||
@@ -110,7 +110,26 @@ const COMPANY = {
|
|||||||
|
|
||||||
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
||||||
|
|
||||||
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto");
|
function resolveRobotoFontDir(): string {
|
||||||
|
const candidates = [
|
||||||
|
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (const dir of candidates) {
|
||||||
|
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
|
||||||
|
return dir;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(
|
||||||
|
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fontDir = resolveRobotoFontDir();
|
||||||
const fonts = {
|
const fonts = {
|
||||||
Roboto: {
|
Roboto: {
|
||||||
normal: join(fontDir, "Roboto-Regular.ttf"),
|
normal: join(fontDir, "Roboto-Regular.ttf"),
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { Server } from "socket.io";
|
import { Server } from "socket.io";
|
||||||
import { events, EventTypes } from "../globalEvents";
|
import { events, EventTypes } from "../globalEvents";
|
||||||
import { WorkerQueue } from "./queues";
|
import { WorkerQueue } from "./queues";
|
||||||
|
import { reserveWorkerId } from "../../workert";
|
||||||
|
|
||||||
function emitGlobalEvent<K extends keyof EventTypes>(
|
function emitGlobalEvent<K extends keyof EventTypes>(
|
||||||
name: K,
|
name: K,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { Socket } from "socket.io-client";
|
import { Socket } from "socket.io-client";
|
||||||
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
||||||
|
import { prisma } from "../../constants";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
||||||
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
|
|||||||
* Called every 5 seconds via PgBoss from the API process interval.
|
* Called every 5 seconds via PgBoss from the API process interval.
|
||||||
*/
|
*/
|
||||||
export async function executeIncrementalSync(): Promise<void> {
|
export async function executeIncrementalSync(): Promise<void> {
|
||||||
return executeForcedIncrementalDalpuriSync();
|
let jobRunId: string | undefined;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const run = await prisma.syncJobRun.create({
|
||||||
|
data: {
|
||||||
|
jobType: "INCREMENTAL_SYNC",
|
||||||
|
status: "RUNNING",
|
||||||
|
triggeredBy: "worker",
|
||||||
|
startedAt: new Date(),
|
||||||
|
},
|
||||||
|
select: { id: true },
|
||||||
|
});
|
||||||
|
jobRunId = run.id;
|
||||||
|
} catch (err) {
|
||||||
|
// Sync should still run even if tracking insert fails.
|
||||||
|
console.error("[sync] Failed to create incremental SyncJobRun", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await executeForcedIncrementalDalpuriSync({ jobRunId });
|
||||||
|
|
||||||
|
if (jobRunId) {
|
||||||
|
await prisma.syncJobRun.update({
|
||||||
|
where: { id: jobRunId },
|
||||||
|
data: {
|
||||||
|
status: "COMPLETED",
|
||||||
|
completedAt: new Date(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
if (jobRunId) {
|
||||||
|
const errorSummary = err instanceof Error ? err.message : String(err);
|
||||||
|
await prisma.syncJobRun
|
||||||
|
.update({
|
||||||
|
where: { id: jobRunId },
|
||||||
|
data: {
|
||||||
|
status: "FAILED",
|
||||||
|
completedAt: new Date(),
|
||||||
|
errorSummary: errorSummary.slice(0, 2000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.catch(() => {
|
||||||
|
// Best-effort update only.
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
|
|||||||
* Called on an interval from the main API process so it survives worker restarts.
|
* Called on an interval from the main API process so it survives worker restarts.
|
||||||
*/
|
*/
|
||||||
export async function enqueueIncrementalSync(): Promise<void> {
|
export async function enqueueIncrementalSync(): Promise<void> {
|
||||||
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
|
const jobId = await getBoss().send(
|
||||||
|
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
|
||||||
|
{
|
||||||
|
enqueuedAt: new Date().toISOString(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
singletonKey: "dalpuri-incremental-sync",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!jobId) {
|
||||||
|
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
|
|||||||
queueType: WorkerQueue,
|
queueType: WorkerQueue,
|
||||||
workFn: (workerSocket: Socket) => Promise<T>,
|
workFn: (workerSocket: Socket) => Promise<T>,
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
|
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
// Request a worker ID and namespace from the manager
|
// Request a worker ID and namespace from the manager
|
||||||
socket.emit(
|
socket.emit(
|
||||||
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Connect to the worker-specific namespace
|
// Connect to the worker-specific namespace
|
||||||
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
|
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
|
||||||
reconnection: false,
|
reconnection: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
+29
-1
@@ -163,6 +163,7 @@ if (import.meta.main) {
|
|||||||
// Register job handler for DALPURI_FULL_SYNC
|
// Register job handler for DALPURI_FULL_SYNC
|
||||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||||
|
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
||||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||||
const socket = await ensureManagerSocketReady();
|
const socket = await ensureManagerSocketReady();
|
||||||
await enqueueDalpuriFullSync(socket);
|
await enqueueDalpuriFullSync(socket);
|
||||||
@@ -170,10 +171,37 @@ if (import.meta.main) {
|
|||||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||||
|
|
||||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||||
await executeIncrementalSync();
|
const startedAt = Date.now();
|
||||||
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
||||||
|
try {
|
||||||
|
await executeIncrementalSync();
|
||||||
|
console.log(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||||
|
|
||||||
|
const enqueueIncrementalWithLogging = () => {
|
||||||
|
enqueueIncrementalSync().catch((err) => {
|
||||||
|
console.error(
|
||||||
|
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
||||||
|
);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
||||||
|
// API interval scheduling is unavailable.
|
||||||
|
enqueueIncrementalWithLogging();
|
||||||
|
setInterval(enqueueIncrementalWithLogging, 5_000);
|
||||||
|
console.log("[worker] Started 5-second incremental enqueue interval");
|
||||||
|
|
||||||
// Register job handler for REFRESH_SALES_METRICS
|
// Register job handler for REFRESH_SALES_METRICS
|
||||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||||
|
|||||||
+82
-15
@@ -75,6 +75,43 @@ type DeleteResult = {
|
|||||||
|
|
||||||
let incrementalDeleteStepIndex = 0;
|
let incrementalDeleteStepIndex = 0;
|
||||||
|
|
||||||
|
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||||
|
"Companies",
|
||||||
|
"Company Addresses",
|
||||||
|
"Contacts",
|
||||||
|
]);
|
||||||
|
|
||||||
|
const criticalFullSyncIntervalMinutes = Math.max(
|
||||||
|
1,
|
||||||
|
Number.parseInt(
|
||||||
|
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "60",
|
||||||
|
10
|
||||||
|
) || 60
|
||||||
|
);
|
||||||
|
|
||||||
|
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||||
|
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||||
|
|
||||||
|
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||||
|
|
||||||
|
const shouldForceCriticalFullSync = (
|
||||||
|
step: Step,
|
||||||
|
forceIncremental: boolean
|
||||||
|
): boolean => {
|
||||||
|
if (!forceIncremental) return false;
|
||||||
|
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
|
||||||
|
|
||||||
|
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastCriticalFullSyncByStep.set(step.name, now);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
const parseEnvFile = (path: string): Record<string, string> => {
|
const parseEnvFile = (path: string): Record<string, string> => {
|
||||||
const envData = readFileSync(path, "utf8");
|
const envData = readFileSync(path, "utf8");
|
||||||
const out: Record<string, string> = {};
|
const out: Record<string, string> = {};
|
||||||
@@ -107,6 +144,20 @@ const resolveApiDatabaseUrl = (): string => {
|
|||||||
if (process.env.OPTIMA_API_DATABASE_URL)
|
if (process.env.OPTIMA_API_DATABASE_URL)
|
||||||
return process.env.OPTIMA_API_DATABASE_URL;
|
return process.env.OPTIMA_API_DATABASE_URL;
|
||||||
|
|
||||||
|
// Worker/runtime fallback:
|
||||||
|
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
|
||||||
|
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
|
||||||
|
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
|
||||||
|
return process.env.DATABASE_URL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
process.env.DATABASE_URL &&
|
||||||
|
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
|
||||||
|
) {
|
||||||
|
return process.env.DATABASE_URL;
|
||||||
|
}
|
||||||
|
|
||||||
const candidates = [
|
const candidates = [
|
||||||
resolve(import.meta.dir, "../../api/.env"),
|
resolve(import.meta.dir, "../../api/.env"),
|
||||||
resolve(process.cwd(), "../api/.env"),
|
resolve(process.cwd(), "../api/.env"),
|
||||||
@@ -1351,6 +1402,15 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
|
||||||
|
|
||||||
const steps: Step[] = [
|
const steps: Step[] = [
|
||||||
|
{
|
||||||
|
name: "CW Members",
|
||||||
|
sourceModel: "member",
|
||||||
|
targetModel: "cwMember",
|
||||||
|
translation: cwMemberTranslation as unknown as AnyTranslation,
|
||||||
|
uniqueField: "cwMemberId",
|
||||||
|
sourceIdField: "memberRecId",
|
||||||
|
sourceUpdatedField: "lastUpdatedUtc",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "Users",
|
name: "Users",
|
||||||
sourceModel: "member",
|
sourceModel: "member",
|
||||||
@@ -1365,15 +1425,6 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "CW Members",
|
|
||||||
sourceModel: "member",
|
|
||||||
targetModel: "cwMember",
|
|
||||||
translation: cwMemberTranslation as unknown as AnyTranslation,
|
|
||||||
uniqueField: "cwMemberId",
|
|
||||||
sourceIdField: "memberRecId",
|
|
||||||
sourceUpdatedField: "lastUpdatedUtc",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "Companies",
|
name: "Companies",
|
||||||
sourceModel: "company",
|
sourceModel: "company",
|
||||||
@@ -1757,19 +1808,35 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
step,
|
step,
|
||||||
forceIncremental
|
forceIncremental
|
||||||
);
|
);
|
||||||
|
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||||
|
step,
|
||||||
|
forceIncremental
|
||||||
|
);
|
||||||
|
const effectiveDecision = forceCriticalFullSync
|
||||||
|
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||||
|
: decision;
|
||||||
|
|
||||||
|
if (forceCriticalFullSync) {
|
||||||
|
console.log(
|
||||||
|
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const sourceIdsFilter =
|
const sourceIdsFilter =
|
||||||
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
effectiveDecision.mode === "incremental"
|
||||||
|
? effectiveDecision.sourceIds
|
||||||
|
: undefined;
|
||||||
console.log(
|
console.log(
|
||||||
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
||||||
decision.mode
|
effectiveDecision.mode
|
||||||
}${
|
}${
|
||||||
decision.mode === "incremental"
|
effectiveDecision.mode === "incremental"
|
||||||
? ` (${decision.sourceIds.length} ids)`
|
? ` (${effectiveDecision.sourceIds.length} ids)`
|
||||||
: ""
|
: ""
|
||||||
}`
|
}`
|
||||||
);
|
);
|
||||||
if (logAllDifferences) {
|
if (logAllDifferences) {
|
||||||
logAllSmartSyncDifferences(step, decision.differences);
|
logAllSmartSyncDifferences(step, effectiveDecision.differences);
|
||||||
}
|
}
|
||||||
const result = await syncStep(
|
const result = await syncStep(
|
||||||
cwPrisma,
|
cwPrisma,
|
||||||
@@ -1791,7 +1858,7 @@ export const executeFullDalpuriSync = async (options?: {
|
|||||||
|
|
||||||
await writeStepLog(
|
await writeStepLog(
|
||||||
step.name,
|
step.name,
|
||||||
decision.mode,
|
effectiveDecision.mode,
|
||||||
result,
|
result,
|
||||||
{ deleted: 0, failed: 0 },
|
{ deleted: 0, failed: 0 },
|
||||||
Date.now() - stepStart
|
Date.now() - stepStart
|
||||||
|
|||||||
Reference in New Issue
Block a user