230 lines
7.3 KiB
TypeScript
230 lines
7.3 KiB
TypeScript
import { PgBoss } from "pg-boss";
|
|
import { io, Socket } from "socket.io-client";
|
|
import { WorkerQueue } from "./modules/workers/queues";
|
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
|
|
|
const boss = new PgBoss(process.env.DATABASE_URL!);
|
|
|
|
boss.on("error", (err) => {
|
|
console.error("[worker] PgBoss error", err);
|
|
});
|
|
|
|
let bossStartPromise: Promise<void> | null = null;
|
|
let reservationQueueReady = false;
|
|
let managerSocket: Socket | null = null;
|
|
let managerSocketReadyPromise: Promise<Socket> | null = null;
|
|
|
|
async function ensureBossStarted(): Promise<void> {
|
|
if (bossStartPromise) return bossStartPromise;
|
|
bossStartPromise = boss.start().then(() => undefined);
|
|
await bossStartPromise;
|
|
}
|
|
|
|
async function ensureReservationQueue(): Promise<void> {
|
|
if (reservationQueueReady) return;
|
|
|
|
try {
|
|
await boss.createQueue(WorkerQueue.WORKER_NAMESPACE_RESERVATION);
|
|
} catch {
|
|
// Queue may already exist; ignore to keep this idempotent.
|
|
}
|
|
|
|
reservationQueueReady = true;
|
|
}
|
|
|
|
export function ensureManagerSocketReady(): Promise<Socket> {
|
|
if (managerSocket && managerSocket.connected)
|
|
return Promise.resolve(managerSocket);
|
|
if (managerSocketReadyPromise) return managerSocketReadyPromise;
|
|
|
|
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
|
managerSocket = io(managerUrl, {
|
|
reconnection: true,
|
|
reconnectionDelay: 1000,
|
|
reconnectionDelayMax: 5000,
|
|
reconnectionAttempts: Infinity,
|
|
});
|
|
|
|
managerSocket.on("connect", () => {
|
|
console.log("[worker] Connected to manager socket on :8671");
|
|
});
|
|
|
|
managerSocket.on("disconnect", (reason) => {
|
|
console.warn(`[worker] Manager socket disconnected: ${reason}`);
|
|
});
|
|
|
|
managerSocket.on("connect_error", (err) => {
|
|
console.error("[worker] Manager socket connect_error", err.message);
|
|
});
|
|
|
|
managerSocket.on("reconnect_attempt", () => {
|
|
console.log("[worker] Attempting to reconnect to manager socket...");
|
|
});
|
|
|
|
managerSocketReadyPromise = new Promise<Socket>((resolve) => {
|
|
const socket = managerSocket!;
|
|
|
|
const onConnect = () => {
|
|
socket.off("connect_error", onConnectError);
|
|
resolve(socket);
|
|
};
|
|
|
|
const onConnectError = (err: Error) => {
|
|
// Log the error but don't reject - let Socket.IO handle reconnection
|
|
console.warn("[worker] Connection attempt failed:", err.message, "- retrying...");
|
|
};
|
|
|
|
socket.once("connect", onConnect);
|
|
socket.on("connect_error", onConnectError);
|
|
});
|
|
|
|
return managerSocketReadyPromise;
|
|
}
|
|
|
|
/**
|
|
* Ask PgBoss to generate a durable job ID we can reuse as a worker namespace ID.
|
|
*
|
|
* A short-lived reservation job is created and immediately canceled to avoid
|
|
* growing the queue while still obtaining a PgBoss-generated ID.
|
|
*/
|
|
export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
|
|
await ensureBossStarted();
|
|
await ensureReservationQueue();
|
|
|
|
const workerId = await boss.send(WorkerQueue.WORKER_NAMESPACE_RESERVATION, {
|
|
queueType,
|
|
requestedAt: new Date().toISOString(),
|
|
});
|
|
|
|
if (!workerId) {
|
|
throw new Error("Failed to reserve PgBoss job ID for worker namespace");
|
|
}
|
|
|
|
try {
|
|
await boss.cancel(WorkerQueue.WORKER_NAMESPACE_RESERVATION, workerId);
|
|
} catch {
|
|
// Best-effort cleanup only.
|
|
}
|
|
|
|
return workerId;
|
|
}
|
|
|
|
async function ensureDalpuriSyncQueue(): Promise<void> {
|
|
try {
|
|
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
|
|
} catch {
|
|
// Queue may already exist; ignore to keep this idempotent.
|
|
}
|
|
try {
|
|
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
|
|
} catch {
|
|
// Queue may already exist; ignore to keep this idempotent.
|
|
}
|
|
try {
|
|
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
|
|
} catch {
|
|
// Queue may already exist; ignore to keep this idempotent.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Initialize the worker system. Must be called before enqueueing jobs.
|
|
* Starts PgBoss connection to PostgreSQL.
|
|
*/
|
|
export async function initializeWorkerSystem(): Promise<void> {
|
|
await ensureBossStarted();
|
|
await ensureReservationQueue();
|
|
await ensureDalpuriSyncQueue();
|
|
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
|
}
|
|
|
|
/**
|
|
* Get the PgBoss instance for direct job enqueueing.
|
|
* Must call initializeWorkerSystem() first.
|
|
*/
|
|
export function getBoss(): PgBoss {
|
|
return boss;
|
|
}
|
|
|
|
if (import.meta.main) {
|
|
// if (Bun.env.NODE_ENV === "development") {
|
|
// setupEventDebugger({ processLabel: "WORKER" });
|
|
// }
|
|
|
|
console.log("[worker] Worker process starting...");
|
|
console.log(
|
|
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
|
|
);
|
|
|
|
// Ensure PgBoss is connected and queues exist
|
|
await ensureBossStarted();
|
|
await ensureDalpuriSyncQueue();
|
|
|
|
// Register job handler for DALPURI_FULL_SYNC
|
|
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
|
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
|
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
|
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
|
const socket = await ensureManagerSocketReady();
|
|
await enqueueDalpuriFullSync(socket);
|
|
});
|
|
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
|
|
|
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
|
const startedAt = Date.now();
|
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
|
try {
|
|
await executeIncrementalSync();
|
|
console.log(
|
|
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
|
);
|
|
} catch (err) {
|
|
console.error(
|
|
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
|
err
|
|
);
|
|
throw err;
|
|
}
|
|
});
|
|
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
|
|
|
const enqueueIncrementalWithLogging = () => {
|
|
enqueueIncrementalSync().catch((err) => {
|
|
console.error(
|
|
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
|
);
|
|
});
|
|
};
|
|
|
|
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
|
// API interval scheduling is unavailable.
|
|
enqueueIncrementalWithLogging();
|
|
setInterval(enqueueIncrementalWithLogging, 5_000);
|
|
console.log("[worker] Started 5-second incremental enqueue interval");
|
|
|
|
// Register job handler for REFRESH_SALES_METRICS
|
|
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
|
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
|
const job = Array.isArray(jobs) ? jobs[0] : jobs;
|
|
const data = job?.data as { forceColdLoad?: boolean } | undefined;
|
|
const forceColdLoad = data?.forceColdLoad ?? false;
|
|
await executeSalesMetricsRefresh({ forceColdLoad });
|
|
});
|
|
console.log("[worker] Registered REFRESH_SALES_METRICS job handler");
|
|
|
|
// Initiate manager socket connection (will reconnect automatically if it fails)
|
|
ensureManagerSocketReady().catch((err) => {
|
|
console.error("[worker] Failed to establish manager socket:", err.message);
|
|
console.log("[worker] Will continue retrying in background...");
|
|
});
|
|
|
|
console.log("[worker] Worker process ready - waiting for job requests");
|
|
|
|
// Keep process alive
|
|
process.on("SIGTERM", async () => {
|
|
console.log("[worker] SIGTERM received, shutting down gracefully");
|
|
await boss.stop();
|
|
process.exit(0);
|
|
});
|
|
}
|