Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 57b5763d41 | |||
| 2bd498a35d | |||
| 86d7426e8b | |||
| afe56393e7 | |||
| b2cd26af30 | |||
| 0594816ea4 |
@@ -231,9 +231,10 @@ jobs:
|
||||
run: bun install --frozen-lockfile
|
||||
|
||||
- name: Rebuild native modules
|
||||
run: npm rebuild
|
||||
run: npm rebuild --ignore-scripts
|
||||
env:
|
||||
HUSKY: "0"
|
||||
HUSKY_SKIP_INSTALL: "1"
|
||||
|
||||
- name: Build macOS distributables
|
||||
run: bun run make:macos
|
||||
@@ -272,9 +273,10 @@ jobs:
|
||||
run: bun install --frozen-lockfile
|
||||
|
||||
- name: Rebuild native modules
|
||||
run: npm rebuild
|
||||
run: npm rebuild --ignore-scripts
|
||||
env:
|
||||
HUSKY: "0"
|
||||
HUSKY_SKIP_INSTALL: "1"
|
||||
|
||||
- name: Build Windows distributables
|
||||
run: bun run make -- --platform win32
|
||||
|
||||
@@ -90,6 +90,9 @@ COPY --from=build /app/dalpuri/generated/ ./dalpuri/generated/
|
||||
# Copy production node_modules (Prisma adapter needs native bindings)
|
||||
COPY --from=deps /app/node_modules/ ./node_modules/
|
||||
|
||||
# Ensure pdfmake Roboto fonts are present at runtime for PDF generation.
|
||||
COPY --from=build /app/api/node_modules/pdfmake/build/fonts/ ./node_modules/pdfmake/build/fonts/
|
||||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
# ---- Stage 4: API server runtime image ----
|
||||
|
||||
@@ -20,6 +20,11 @@ spec:
|
||||
env:
|
||||
- name: MANAGER_SOCKET_URL
|
||||
value: "http://optima-api.optima.svc.cluster.local:8671"
|
||||
- name: API_DATABASE_URL
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: api-env-secret
|
||||
key: DATABASE_URL
|
||||
envFrom:
|
||||
- secretRef:
|
||||
name: api-env-secret
|
||||
|
||||
@@ -76,6 +76,20 @@ function mapRatingNameToInterest(
|
||||
return null;
|
||||
}
|
||||
|
||||
function formatOpportunityContactName(
|
||||
firstName?: string | null,
|
||||
lastName?: string | null
|
||||
): string {
|
||||
const first = (firstName ?? "").trim();
|
||||
const last = (lastName ?? "").trim();
|
||||
|
||||
if (first && last.toLowerCase() === "contact") {
|
||||
return first;
|
||||
}
|
||||
|
||||
return `${first} ${last}`.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Opportunity Controller
|
||||
*
|
||||
@@ -290,7 +304,7 @@ export class OpportunityController {
|
||||
| null
|
||||
| undefined;
|
||||
this.contactName = (data as any).contactName ?? (contactRel
|
||||
? `${contactRel.firstName} ${contactRel.lastName}`.trim()
|
||||
? formatOpportunityContactName(contactRel.firstName, contactRel.lastName)
|
||||
: null);
|
||||
|
||||
// Site
|
||||
@@ -674,7 +688,7 @@ export class OpportunityController {
|
||||
id: contact.id,
|
||||
contact: {
|
||||
id: contact.id,
|
||||
name: `${contact.firstName} ${contact.lastName}`.trim(),
|
||||
name: formatOpportunityContactName(contact.firstName, contact.lastName),
|
||||
},
|
||||
company: contact.company
|
||||
? {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import PdfPrinter from "pdfmake/src/Printer";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { existsSync, readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
export interface QuoteLineItem {
|
||||
@@ -110,7 +110,26 @@ const COMPANY = {
|
||||
|
||||
const DEFAULT_LOGO_PATH = join(process.cwd(), "logo.png");
|
||||
|
||||
const fontDir = join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto");
|
||||
function resolveRobotoFontDir(): string {
|
||||
const candidates = [
|
||||
join(process.cwd(), "node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join(import.meta.dir, "../../../node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join("/app/node_modules/pdfmake/build/fonts/Roboto"),
|
||||
join("/app/api/node_modules/pdfmake/build/fonts/Roboto"),
|
||||
];
|
||||
|
||||
for (const dir of candidates) {
|
||||
if (existsSync(join(dir, "Roboto-Medium.ttf"))) {
|
||||
return dir;
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`[pdf] Could not locate pdfmake Roboto fonts. Checked: ${candidates.join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
const fontDir = resolveRobotoFontDir();
|
||||
const fonts = {
|
||||
Roboto: {
|
||||
normal: join(fontDir, "Roboto-Regular.ttf"),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Socket } from "socket.io-client";
|
||||
import { executeFullDalpuriSync, executeForcedIncrementalDalpuriSync } from "dalpuri";
|
||||
import { prisma } from "../../constants";
|
||||
|
||||
/**
|
||||
* Execute a full sync from Dalpuri (ConnectWise) to the API database.
|
||||
@@ -14,5 +15,53 @@ export async function executeFullSync(_workerSocket: Socket): Promise<void> {
|
||||
* Called every 5 seconds via PgBoss from the API process interval.
|
||||
*/
|
||||
export async function executeIncrementalSync(): Promise<void> {
|
||||
return executeForcedIncrementalDalpuriSync();
|
||||
let jobRunId: string | undefined;
|
||||
|
||||
try {
|
||||
const run = await prisma.syncJobRun.create({
|
||||
data: {
|
||||
jobType: "INCREMENTAL_SYNC",
|
||||
status: "RUNNING",
|
||||
triggeredBy: "worker",
|
||||
startedAt: new Date(),
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
jobRunId = run.id;
|
||||
} catch (err) {
|
||||
// Sync should still run even if tracking insert fails.
|
||||
console.error("[sync] Failed to create incremental SyncJobRun", err);
|
||||
}
|
||||
|
||||
try {
|
||||
await executeForcedIncrementalDalpuriSync({ jobRunId });
|
||||
|
||||
if (jobRunId) {
|
||||
await prisma.syncJobRun.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "COMPLETED",
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
if (jobRunId) {
|
||||
const errorSummary = err instanceof Error ? err.message : String(err);
|
||||
await prisma.syncJobRun
|
||||
.update({
|
||||
where: { id: jobRunId },
|
||||
data: {
|
||||
status: "FAILED",
|
||||
completedAt: new Date(),
|
||||
errorSummary: errorSummary.slice(0, 2000),
|
||||
},
|
||||
})
|
||||
.catch(() => {
|
||||
// Best-effort update only.
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
+171
-6
@@ -75,6 +75,136 @@ type DeleteResult = {
|
||||
|
||||
let incrementalDeleteStepIndex = 0;
|
||||
|
||||
const CRITICAL_INCREMENTAL_RECONCILE_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_TABLES = new Set([
|
||||
"Companies",
|
||||
"Company Addresses",
|
||||
"Contacts",
|
||||
"Opportunities",
|
||||
]);
|
||||
|
||||
const criticalFullSyncIntervalMinutes = Math.max(
|
||||
1,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_FULL_SYNC_INTERVAL_MINUTES ?? "15",
|
||||
10
|
||||
) || 15
|
||||
);
|
||||
|
||||
const CRITICAL_FULL_SYNC_INTERVAL_MS =
|
||||
criticalFullSyncIntervalMinutes * 60 * 1000;
|
||||
|
||||
const criticalCwWatermarkOverlapSeconds = Math.max(
|
||||
5,
|
||||
Number.parseInt(
|
||||
process.env.DALPURI_CRITICAL_CW_WATERMARK_OVERLAP_SECONDS ?? "60",
|
||||
10
|
||||
) || 60
|
||||
);
|
||||
|
||||
const CRITICAL_CW_WATERMARK_OVERLAP_MS =
|
||||
criticalCwWatermarkOverlapSeconds * 1000;
|
||||
|
||||
const criticalCwDeltaLimit = Math.max(
|
||||
100,
|
||||
Number.parseInt(process.env.DALPURI_CRITICAL_CW_DELTA_LIMIT ?? "5000", 10) ||
|
||||
5000
|
||||
);
|
||||
|
||||
const lastCriticalFullSyncByStep = new Map<string, number>();
|
||||
const lastCriticalCwWatermarkByStep = new Map<string, Date>();
|
||||
|
||||
const shouldForceCriticalFullSync = (
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): boolean => {
|
||||
if (!forceIncremental) return false;
|
||||
if (!CRITICAL_INCREMENTAL_RECONCILE_TABLES.has(step.name)) return false;
|
||||
|
||||
const now = Date.now();
|
||||
const last = lastCriticalFullSyncByStep.get(step.name) ?? 0;
|
||||
|
||||
if (now - last < CRITICAL_FULL_SYNC_INTERVAL_MS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
lastCriticalFullSyncByStep.set(step.name, now);
|
||||
return true;
|
||||
};
|
||||
|
||||
const computeCriticalCwWatermarkDecision = async (
|
||||
cwPrisma: CwPrismaClient,
|
||||
step: Step,
|
||||
forceIncremental: boolean
|
||||
): Promise<SmartSyncDecision | null> => {
|
||||
if (!forceIncremental) return null;
|
||||
if (!CRITICAL_CW_WATERMARK_TABLES.has(step.name)) return null;
|
||||
|
||||
const cwDelegate = (
|
||||
cwPrisma as unknown as Record<string, { findMany: Function } | undefined>
|
||||
)[step.sourceModel];
|
||||
|
||||
if (!cwDelegate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const existingWhere =
|
||||
(step.sourceArgs as Record<string, unknown> | undefined)?.where ?? {};
|
||||
|
||||
const lastWatermark = lastCriticalCwWatermarkByStep.get(step.name);
|
||||
const lowerBound = lastWatermark
|
||||
? new Date(lastWatermark.getTime() - CRITICAL_CW_WATERMARK_OVERLAP_MS)
|
||||
: new Date(Date.now() - CRITICAL_CW_WATERMARK_OVERLAP_MS);
|
||||
|
||||
const rows = (await cwDelegate.findMany({
|
||||
select: {
|
||||
[step.sourceIdField]: true,
|
||||
[step.sourceUpdatedField]: true,
|
||||
},
|
||||
where: {
|
||||
...(existingWhere as Record<string, unknown>),
|
||||
[step.sourceUpdatedField]: {
|
||||
gte: lowerBound,
|
||||
},
|
||||
},
|
||||
orderBy: { [step.sourceUpdatedField]: "asc" },
|
||||
take: criticalCwDeltaLimit,
|
||||
})) as Row[];
|
||||
|
||||
if (rows.length >= criticalCwDeltaLimit) {
|
||||
console.warn(
|
||||
` [smart-sync][critical-watermark] ${step.name}: delta reached limit (${criticalCwDeltaLimit}), forcing full sync`
|
||||
);
|
||||
return { mode: "full", differences: [] };
|
||||
}
|
||||
|
||||
if (rows.length > 0) {
|
||||
const latest = rows[rows.length - 1][step.sourceUpdatedField] as Date | null;
|
||||
if (latest) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, latest);
|
||||
}
|
||||
} else if (!lastWatermark) {
|
||||
lastCriticalCwWatermarkByStep.set(step.name, new Date());
|
||||
}
|
||||
|
||||
const sourceIds = rows.map((r) => r[step.sourceIdField] as number);
|
||||
console.log(
|
||||
` [smart-sync][critical-watermark] ${step.name}: ${sourceIds.length} ids since ${lowerBound.toISOString()}`
|
||||
);
|
||||
|
||||
return {
|
||||
mode: "incremental",
|
||||
sourceIds,
|
||||
differences: [],
|
||||
};
|
||||
};
|
||||
|
||||
const parseEnvFile = (path: string): Record<string, string> => {
|
||||
const envData = readFileSync(path, "utf8");
|
||||
const out: Record<string, string> = {};
|
||||
@@ -107,6 +237,20 @@ const resolveApiDatabaseUrl = (): string => {
|
||||
if (process.env.OPTIMA_API_DATABASE_URL)
|
||||
return process.env.OPTIMA_API_DATABASE_URL;
|
||||
|
||||
// Worker/runtime fallback:
|
||||
// In Kubernetes we often provide CW via CW_DATABASE_URL and API Postgres via
|
||||
// DATABASE_URL. Only use DATABASE_URL as API when we can safely infer that.
|
||||
if (process.env.CW_DATABASE_URL && process.env.DATABASE_URL) {
|
||||
return process.env.DATABASE_URL;
|
||||
}
|
||||
|
||||
if (
|
||||
process.env.DATABASE_URL &&
|
||||
/^(postgres|postgresql):\/\//i.test(process.env.DATABASE_URL)
|
||||
) {
|
||||
return process.env.DATABASE_URL;
|
||||
}
|
||||
|
||||
const candidates = [
|
||||
resolve(import.meta.dir, "../../api/.env"),
|
||||
resolve(process.cwd(), "../api/.env"),
|
||||
@@ -1757,19 +1901,40 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const criticalWatermarkDecision = await computeCriticalCwWatermarkDecision(
|
||||
cwPrisma,
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const forceCriticalFullSync = shouldForceCriticalFullSync(
|
||||
step,
|
||||
forceIncremental
|
||||
);
|
||||
const effectiveDecision = forceCriticalFullSync
|
||||
? ({ mode: "full", differences: decision.differences } as SmartSyncDecision)
|
||||
: criticalWatermarkDecision ?? decision;
|
||||
|
||||
if (forceCriticalFullSync) {
|
||||
console.log(
|
||||
` [smart-sync][forced-full] ${step.name}: forcing periodic full reconciliation every ${criticalFullSyncIntervalMinutes}m`
|
||||
);
|
||||
}
|
||||
|
||||
const sourceIdsFilter =
|
||||
decision.mode === "incremental" ? decision.sourceIds : undefined;
|
||||
effectiveDecision.mode === "incremental"
|
||||
? effectiveDecision.sourceIds
|
||||
: undefined;
|
||||
console.log(
|
||||
` [smart-sync]${forceIncremental ? "[forced]" : ""} mode=${
|
||||
decision.mode
|
||||
effectiveDecision.mode
|
||||
}${
|
||||
decision.mode === "incremental"
|
||||
? ` (${decision.sourceIds.length} ids)`
|
||||
effectiveDecision.mode === "incremental"
|
||||
? ` (${effectiveDecision.sourceIds.length} ids)`
|
||||
: ""
|
||||
}`
|
||||
);
|
||||
if (logAllDifferences) {
|
||||
logAllSmartSyncDifferences(step, decision.differences);
|
||||
logAllSmartSyncDifferences(step, effectiveDecision.differences);
|
||||
}
|
||||
const result = await syncStep(
|
||||
cwPrisma,
|
||||
@@ -1791,7 +1956,7 @@ export const executeFullDalpuriSync = async (options?: {
|
||||
|
||||
await writeStepLog(
|
||||
step.name,
|
||||
decision.mode,
|
||||
effectiveDecision.mode,
|
||||
result,
|
||||
{ deleted: 0, failed: 0 },
|
||||
Date.now() - stepStart
|
||||
|
||||
@@ -46,7 +46,7 @@ export const contactTranslation: Translation<CwContact, ApiContact> = {
|
||||
{
|
||||
from: "lastName",
|
||||
to: "lastName",
|
||||
process: (value) => (value ? value : "Contact"),
|
||||
process: (value) => (value ? value : ""),
|
||||
},
|
||||
{ from: "nickName", to: "nickname" },
|
||||
{ from: "title", to: "title" },
|
||||
|
||||
Reference in New Issue
Block a user