all the haul
This commit is contained in:
+88
-63
@@ -1,9 +1,6 @@
|
||||
import { PgBoss } from "pg-boss";
|
||||
import { io, Socket } from "socket.io-client";
|
||||
import { WorkerQueue } from "./modules/workers/queues";
|
||||
import { refreshActiveOpportunitiesWorker } from "./modules/workers/cache/refreshActiveOpportunities";
|
||||
import { refreshOpportunities } from "./modules/cw-utils/opportunities/refreshOpportunities";
|
||||
import { opportunityCw } from "./modules/cw-utils/opportunities/opportunities";
|
||||
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
||||
|
||||
const boss = new PgBoss(process.env.DATABASE_URL!);
|
||||
@@ -14,7 +11,6 @@ boss.on("error", (err) => {
|
||||
|
||||
let bossStartPromise: Promise<void> | null = null;
|
||||
let reservationQueueReady = false;
|
||||
let opportunityWorkersRegistered = false;
|
||||
let managerSocket: Socket | null = null;
|
||||
let managerSocketReadyPromise: Promise<Socket> | null = null;
|
||||
|
||||
@@ -36,13 +32,16 @@ async function ensureReservationQueue(): Promise<void> {
|
||||
reservationQueueReady = true;
|
||||
}
|
||||
|
||||
function ensureManagerSocketReady(): Promise<Socket> {
|
||||
export function ensureManagerSocketReady(): Promise<Socket> {
|
||||
if (managerSocket && managerSocket.connected)
|
||||
return Promise.resolve(managerSocket);
|
||||
if (managerSocketReadyPromise) return managerSocketReadyPromise;
|
||||
|
||||
managerSocket = io("http://localhost:8671", {
|
||||
reconnection: true,
|
||||
reconnectionDelay: 1000,
|
||||
reconnectionDelayMax: 5000,
|
||||
reconnectionAttempts: Infinity,
|
||||
});
|
||||
|
||||
managerSocket.on("connect", () => {
|
||||
@@ -57,7 +56,11 @@ function ensureManagerSocketReady(): Promise<Socket> {
|
||||
console.error("[worker] Manager socket connect_error", err.message);
|
||||
});
|
||||
|
||||
managerSocketReadyPromise = new Promise<Socket>((resolve, reject) => {
|
||||
managerSocket.on("reconnect_attempt", () => {
|
||||
console.log("[worker] Attempting to reconnect to manager socket...");
|
||||
});
|
||||
|
||||
managerSocketReadyPromise = new Promise<Socket>((resolve) => {
|
||||
const socket = managerSocket!;
|
||||
|
||||
const onConnect = () => {
|
||||
@@ -66,13 +69,12 @@ function ensureManagerSocketReady(): Promise<Socket> {
|
||||
};
|
||||
|
||||
const onConnectError = (err: Error) => {
|
||||
socket.off("connect", onConnect);
|
||||
managerSocketReadyPromise = null;
|
||||
reject(err);
|
||||
// Log the error but don't reject - let Socket.IO handle reconnection
|
||||
console.warn("[worker] Connection attempt failed:", err.message, "- retrying...");
|
||||
};
|
||||
|
||||
socket.once("connect", onConnect);
|
||||
socket.once("connect_error", onConnectError);
|
||||
socket.on("connect_error", onConnectError);
|
||||
});
|
||||
|
||||
return managerSocketReadyPromise;
|
||||
@@ -106,70 +108,93 @@ export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
|
||||
return workerId;
|
||||
}
|
||||
|
||||
async function ensureOpportunityQueues(): Promise<void> {
|
||||
async function ensureDalpuriSyncQueue(): Promise<void> {
|
||||
try {
|
||||
await boss.createQueue(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES);
|
||||
await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC);
|
||||
} catch {
|
||||
// Queue may already exist; ignore.
|
||||
// Queue may already exist; ignore to keep this idempotent.
|
||||
}
|
||||
try {
|
||||
await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC);
|
||||
} catch {
|
||||
// Queue may already exist; ignore to keep this idempotent.
|
||||
}
|
||||
try {
|
||||
await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS);
|
||||
} catch {
|
||||
// Queue may already exist; ignore to keep this idempotent.
|
||||
}
|
||||
}
|
||||
|
||||
export async function startOpportunityCacheWorkers(): Promise<void> {
|
||||
if (opportunityWorkersRegistered) return;
|
||||
|
||||
/**
|
||||
* Initialize the worker system. Must be called before enqueueing jobs.
|
||||
* Starts PgBoss connection to PostgreSQL.
|
||||
*/
|
||||
export async function initializeWorkerSystem(): Promise<void> {
|
||||
await ensureBossStarted();
|
||||
await ensureReservationQueue();
|
||||
await ensureOpportunityQueues();
|
||||
|
||||
boss.work(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES, async () => {
|
||||
const socket = await ensureManagerSocketReady();
|
||||
await refreshActiveOpportunitiesWorker(socket, {
|
||||
runFullRefresh: () =>
|
||||
refreshOpportunities({
|
||||
collectorFetch: () =>
|
||||
opportunityCw.fetchAllOpportunitiesFromCollector(),
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
opportunityWorkersRegistered = true;
|
||||
await ensureDalpuriSyncQueue();
|
||||
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
||||
}
|
||||
|
||||
export async function enqueueActiveOpportunityRefreshJob(): Promise<string> {
|
||||
await ensureBossStarted();
|
||||
await ensureOpportunityQueues();
|
||||
|
||||
const jobId = await boss.send(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES, {
|
||||
enqueuedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
if (!jobId) {
|
||||
throw new Error("Failed to enqueue active opportunity refresh job");
|
||||
}
|
||||
|
||||
return jobId;
|
||||
}
|
||||
|
||||
export async function enqueueArchivedOpportunityRefreshJob(
|
||||
force = false,
|
||||
): Promise<string> {
|
||||
console.warn(
|
||||
`[worker] enqueueArchivedOpportunityRefreshJob(force=${force}) is deprecated; enqueueing unified active refresh job instead`,
|
||||
);
|
||||
return enqueueActiveOpportunityRefreshJob();
|
||||
/**
|
||||
* Get the PgBoss instance for direct job enqueueing.
|
||||
* Must call initializeWorkerSystem() first.
|
||||
*/
|
||||
export function getBoss(): PgBoss {
|
||||
return boss;
|
||||
}
|
||||
|
||||
if (import.meta.main) {
|
||||
if (Bun.env.NODE_ENV === "development") {
|
||||
setupEventDebugger({ processLabel: "WORKER" });
|
||||
}
|
||||
// if (Bun.env.NODE_ENV === "development") {
|
||||
// setupEventDebugger({ processLabel: "WORKER" });
|
||||
// }
|
||||
|
||||
startOpportunityCacheWorkers()
|
||||
.then(() => {
|
||||
console.log("[worker] Opportunity cache workers started");
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error("[worker] Failed to start opportunity cache workers", err);
|
||||
process.exit(1);
|
||||
});
|
||||
console.log("[worker] Worker process starting...");
|
||||
console.log(
|
||||
"[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on :8671"
|
||||
);
|
||||
|
||||
// Ensure PgBoss is connected and queues exist
|
||||
await ensureBossStarted();
|
||||
await ensureDalpuriSyncQueue();
|
||||
|
||||
// Register job handler for DALPURI_FULL_SYNC
|
||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||
const socket = await ensureManagerSocketReady();
|
||||
await enqueueDalpuriFullSync();
|
||||
});
|
||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||
|
||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||
await executeIncrementalSync();
|
||||
});
|
||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||
|
||||
// Register job handler for REFRESH_SALES_METRICS
|
||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||
const job = Array.isArray(jobs) ? jobs[0] : jobs;
|
||||
const data = job?.data as { forceColdLoad?: boolean } | undefined;
|
||||
const forceColdLoad = data?.forceColdLoad ?? false;
|
||||
await executeSalesMetricsRefresh({ forceColdLoad });
|
||||
});
|
||||
console.log("[worker] Registered REFRESH_SALES_METRICS job handler");
|
||||
|
||||
// Initiate manager socket connection (will reconnect automatically if it fails)
|
||||
ensureManagerSocketReady().catch((err) => {
|
||||
console.error("[worker] Failed to establish manager socket:", err.message);
|
||||
console.log("[worker] Will continue retrying in background...");
|
||||
});
|
||||
|
||||
console.log("[worker] Worker process ready - waiting for job requests");
|
||||
|
||||
// Keep process alive
|
||||
process.on("SIGTERM", async () => {
|
||||
console.log("[worker] SIGTERM received, shutting down gracefully");
|
||||
await boss.stop();
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user