Compare commits

..

2 Commits

Author SHA1 Message Date
HoloPanio 71fe36c0b8 fix(worker): restore reliable 5s incremental sync cadence 2026-04-10 01:00:04 +00:00
HoloPanio e0d575454e fix(dalpuri): sync CW Members before Users to resolve FK ordering issue
User rows have a FK constraint to CwMember (User_cwMemberId_fkey). Syncing
Users first caused all 140 User upserts to fail since the CwMember table was
empty. This cascade failure then caused all Opportunity upserts to fail because
Opportunity.primarySalesRepId is FK-constrained to User.cwIdentifier.

Fix: reorder steps so CW Members syncs first, then Users.
2026-04-09 01:04:00 +00:00
5 changed files with 55 additions and 12 deletions
+1
View File
@@ -1,6 +1,7 @@
import { Server } from "socket.io"; import { Server } from "socket.io";
import { events, EventTypes } from "../globalEvents"; import { events, EventTypes } from "../globalEvents";
import { WorkerQueue } from "./queues"; import { WorkerQueue } from "./queues";
import { reserveWorkerId } from "../../workert";
function emitGlobalEvent<K extends keyof EventTypes>( function emitGlobalEvent<K extends keyof EventTypes>(
name: K, name: K,
+13 -1
View File
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
* Called on an interval from the main API process so it survives worker restarts. * Called on an interval from the main API process so it survives worker restarts.
*/ */
export async function enqueueIncrementalSync(): Promise<void> { export async function enqueueIncrementalSync(): Promise<void> {
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {}); const jobId = await getBoss().send(
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
{
enqueuedAt: new Date().toISOString(),
},
{
singletonKey: "dalpuri-incremental-sync",
}
);
if (!jobId) {
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
}
} }
+3 -1
View File
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
queueType: WorkerQueue, queueType: WorkerQueue,
workFn: (workerSocket: Socket) => Promise<T>, workFn: (workerSocket: Socket) => Promise<T>,
): Promise<T> { ): Promise<T> {
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Request a worker ID and namespace from the manager // Request a worker ID and namespace from the manager
socket.emit( socket.emit(
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
} }
// Connect to the worker-specific namespace // Connect to the worker-specific namespace
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, { const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
reconnection: false, reconnection: false,
}); });
+29 -1
View File
@@ -163,6 +163,7 @@ if (import.meta.main) {
// Register job handler for DALPURI_FULL_SYNC // Register job handler for DALPURI_FULL_SYNC
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
const socket = await ensureManagerSocketReady(); const socket = await ensureManagerSocketReady();
await enqueueDalpuriFullSync(socket); await enqueueDalpuriFullSync(socket);
@@ -170,10 +171,37 @@ if (import.meta.main) {
console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
await executeIncrementalSync(); const startedAt = Date.now();
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
try {
await executeIncrementalSync();
console.log(
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
);
} catch (err) {
console.error(
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
err
);
throw err;
}
}); });
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
const enqueueIncrementalWithLogging = () => {
enqueueIncrementalSync().catch((err) => {
console.error(
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
);
});
};
// Keep a worker-local 5s scheduler so incremental sync continues even when
// API interval scheduling is unavailable.
enqueueIncrementalWithLogging();
setInterval(enqueueIncrementalWithLogging, 5_000);
console.log("[worker] Started 5-second incremental enqueue interval");
// Register job handler for REFRESH_SALES_METRICS // Register job handler for REFRESH_SALES_METRICS
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => { await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
+9 -9
View File
@@ -1351,6 +1351,15 @@ export const executeFullDalpuriSync = async (options?: {
const isTimedOut = () => Date.now() - syncStartTime > timeoutMs; const isTimedOut = () => Date.now() - syncStartTime > timeoutMs;
const steps: Step[] = [ const steps: Step[] = [
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Users", name: "Users",
sourceModel: "member", sourceModel: "member",
@@ -1365,15 +1374,6 @@ export const executeFullDalpuriSync = async (options?: {
}, },
}, },
}, },
{
name: "CW Members",
sourceModel: "member",
targetModel: "cwMember",
translation: cwMemberTranslation as unknown as AnyTranslation,
uniqueField: "cwMemberId",
sourceIdField: "memberRecId",
sourceUpdatedField: "lastUpdatedUtc",
},
{ {
name: "Companies", name: "Companies",
sourceModel: "company", sourceModel: "company",