diff --git a/api/src/modules/workers/coms.ts b/api/src/modules/workers/coms.ts index 0ef4a64..79a2e46 100644 --- a/api/src/modules/workers/coms.ts +++ b/api/src/modules/workers/coms.ts @@ -1,6 +1,7 @@ import { Server } from "socket.io"; import { events, EventTypes } from "../globalEvents"; import { WorkerQueue } from "./queues"; +import { reserveWorkerId } from "../../workert"; function emitGlobalEvent( name: K, diff --git a/api/src/modules/workers/incremental-sync.ts b/api/src/modules/workers/incremental-sync.ts index c455f2b..bd7533e 100644 --- a/api/src/modules/workers/incremental-sync.ts +++ b/api/src/modules/workers/incremental-sync.ts @@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues"; * Called on an interval from the main API process so it survives worker restarts. */ export async function enqueueIncrementalSync(): Promise { - await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {}); + const jobId = await getBoss().send( + WorkerQueue.DALPURI_INCREMENTAL_SYNC, + { + enqueuedAt: new Date().toISOString(), + }, + { + singletonKey: "dalpuri-incremental-sync", + } + ); + + if (!jobId) { + console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active"); + } } diff --git a/api/src/modules/workers/jobFactory.ts b/api/src/modules/workers/jobFactory.ts index 1766076..2daf160 100644 --- a/api/src/modules/workers/jobFactory.ts +++ b/api/src/modules/workers/jobFactory.ts @@ -32,6 +32,8 @@ export async function createWorkerJob( queueType: WorkerQueue, workFn: (workerSocket: Socket) => Promise, ): Promise { + const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"; + return new Promise((resolve, reject) => { // Request a worker ID and namespace from the manager socket.emit( @@ -53,7 +55,7 @@ export async function createWorkerJob( } // Connect to the worker-specific namespace - const workerSocket = io(`http://localhost:8671/worker-${workerId}`, { + const workerSocket = io(`${managerUrl}/worker-${workerId}`, { reconnection: false, }); diff --git a/api/src/workert.ts b/api/src/workert.ts index 3c4b630..dccef23 100644 --- a/api/src/workert.ts +++ b/api/src/workert.ts @@ -163,6 +163,7 @@ if (import.meta.main) { // Register job handler for DALPURI_FULL_SYNC const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); + const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync"); await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { const socket = await ensureManagerSocketReady(); await enqueueDalpuriFullSync(socket); @@ -170,10 +171,37 @@ if (import.meta.main) { console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { - await executeIncrementalSync(); + const startedAt = Date.now(); + console.log("[worker] DALPURI_INCREMENTAL_SYNC started"); + try { + await executeIncrementalSync(); + console.log( + `[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms` + ); + } catch (err) { + console.error( + `[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`, + err + ); + throw err; + } }); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); + const enqueueIncrementalWithLogging = () => { + enqueueIncrementalSync().catch((err) => { + console.error( + `[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}` + ); + }); + }; + + // Keep a worker-local 5s scheduler so incremental sync continues even when + // API interval scheduling is unavailable. + enqueueIncrementalWithLogging(); + setInterval(enqueueIncrementalWithLogging, 5_000); + console.log("[worker] Started 5-second incremental enqueue interval"); + // Register job handler for REFRESH_SALES_METRICS const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {