import { io, Socket } from "socket.io-client"; import { WorkerQueue } from "./modules/workers/queues"; import { setupEventDebugger } from "./modules/logging/eventDebugger"; import { boss, getBoss } from "./boss-instance"; let bossStartPromise: Promise | null = null; let reservationQueueReady = false; let managerSocket: Socket | null = null; let managerSocketReadyPromise: Promise | null = null; async function ensureBossStarted(): Promise { if (bossStartPromise) return bossStartPromise; bossStartPromise = boss.start().then(() => undefined); await bossStartPromise; } async function ensureReservationQueue(): Promise { if (reservationQueueReady) return; try { await boss.createQueue(WorkerQueue.WORKER_NAMESPACE_RESERVATION); } catch { // Queue may already exist; ignore to keep this idempotent. } reservationQueueReady = true; } export function ensureManagerSocketReady(): Promise { if (managerSocket && managerSocket.connected) return Promise.resolve(managerSocket); if (managerSocketReadyPromise) return managerSocketReadyPromise; const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"; managerSocket = io(managerUrl, { reconnection: true, reconnectionDelay: 1000, reconnectionDelayMax: 5000, reconnectionAttempts: Infinity, }); managerSocket.on("connect", () => { console.log("[worker] Connected to manager socket on :8671"); }); managerSocket.on("disconnect", (reason) => { console.warn(`[worker] Manager socket disconnected: ${reason}`); }); managerSocket.on("connect_error", (err) => { console.error("[worker] Manager socket connect_error", err.message); }); managerSocket.on("reconnect_attempt", () => { console.log("[worker] Attempting to reconnect to manager socket..."); }); managerSocketReadyPromise = new Promise((resolve) => { const socket = managerSocket!; const onConnect = () => { socket.off("connect_error", onConnectError); resolve(socket); }; const onConnectError = (err: Error) => { // Log the error but don't reject - let Socket.IO handle reconnection console.warn("[worker] Connection attempt failed:", err.message, "- retrying..."); }; socket.once("connect", onConnect); socket.on("connect_error", onConnectError); }); return managerSocketReadyPromise; } /** * Ask PgBoss to generate a durable job ID we can reuse as a worker namespace ID. * * A short-lived reservation job is created and immediately canceled to avoid * growing the queue while still obtaining a PgBoss-generated ID. */ export async function reserveWorkerId(queueType: WorkerQueue): Promise { await ensureBossStarted(); await ensureReservationQueue(); const workerId = await boss.send(WorkerQueue.WORKER_NAMESPACE_RESERVATION, { queueType, requestedAt: new Date().toISOString(), }); if (!workerId) { throw new Error("Failed to reserve PgBoss job ID for worker namespace"); } try { await boss.cancel(WorkerQueue.WORKER_NAMESPACE_RESERVATION, workerId); } catch { // Best-effort cleanup only. } return workerId; } async function ensureDalpuriSyncQueue(): Promise { try { console.log("[worker] Creating DALPURI_FULL_SYNC queue..."); await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC); console.log("[worker] DALPURI_FULL_SYNC queue ready"); } catch (err) { console.log("[worker] DALPURI_FULL_SYNC queue already exists (or error):", (err as Error).message); } try { console.log("[worker] Creating DALPURI_INCREMENTAL_SYNC queue..."); await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC); console.log("[worker] DALPURI_INCREMENTAL_SYNC queue ready"); } catch (err) { console.log("[worker] DALPURI_INCREMENTAL_SYNC queue already exists (or error):", (err as Error).message); } try { console.log("[worker] Creating REFRESH_SALES_METRICS queue..."); await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS); console.log("[worker] REFRESH_SALES_METRICS queue ready"); } catch (err) { console.log("[worker] REFRESH_SALES_METRICS queue already exists (or error):", (err as Error).message); } } /** * Initialize the worker system. Must be called before enqueueing jobs. * Starts PgBoss connection to PostgreSQL. */ export async function initializeWorkerSystem(): Promise { await ensureBossStarted(); await ensureReservationQueue(); await ensureDalpuriSyncQueue(); console.log("[worker] Worker system initialized - ready for job enqueueing"); } if (import.meta.main) { // if (Bun.env.NODE_ENV === "development") { // setupEventDebugger({ processLabel: "WORKER" }); // } console.log("[worker] Worker process starting..."); console.log( `[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}` ); console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`); // Ensure PgBoss is connected and queues exist console.log("[worker] Starting PgBoss..."); try { await Promise.race([ ensureBossStarted(), new Promise((_, reject) => setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000) ), ]); } catch (err) { console.error("[worker] FATAL: PgBoss failed to start:", err); process.exit(1); } console.log("[worker] PgBoss started successfully"); console.log("[worker] Ensuring sync queues..."); await ensureDalpuriSyncQueue(); console.log("[worker] Sync queues ready"); // Register job handler for DALPURI_FULL_SYNC console.log("[worker] Importing sync-manager..."); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); console.log("[worker] Importing dalpuri-sync..."); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); console.log("[worker] Importing incremental-sync..."); const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync"); await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { const socket = await ensureManagerSocketReady(); await enqueueDalpuriFullSync(socket); }); console.log("[worker] Registered DALPURI_FULL_SYNC job handler"); await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => { const startedAt = Date.now(); console.log("[worker] DALPURI_INCREMENTAL_SYNC started"); try { await executeIncrementalSync(); console.log( `[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms` ); } catch (err) { console.error( `[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`, err ); throw err; } }); console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler"); const enqueueIncrementalWithLogging = () => { enqueueIncrementalSync().catch((err) => { console.error( `[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}` ); }); }; // Keep a worker-local 5s scheduler so incremental sync continues even when // API interval scheduling is unavailable. enqueueIncrementalWithLogging(); setInterval(enqueueIncrementalWithLogging, 5_000); console.log("[worker] Started 5-second incremental enqueue interval"); // Register job handler for REFRESH_SALES_METRICS const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics"); await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => { const job = Array.isArray(jobs) ? jobs[0] : jobs; const data = job?.data as { forceColdLoad?: boolean } | undefined; const forceColdLoad = data?.forceColdLoad ?? false; await executeSalesMetricsRefresh({ forceColdLoad }); }); console.log("[worker] Registered REFRESH_SALES_METRICS job handler"); // Initiate manager socket connection (will reconnect automatically if it fails) ensureManagerSocketReady().catch((err) => { console.error("[worker] Failed to establish manager socket:", err.message); console.log("[worker] Will continue retrying in background..."); }); console.log("[worker] Worker process ready - waiting for job requests"); // Keep process alive process.on("SIGTERM", async () => { console.log("[worker] SIGTERM received, shutting down gracefully"); await boss.stop(); process.exit(0); }); }