fix(worker): break circular import by extracting PgBoss singleton
incremental-sync.ts and api/cw/sync.ts imported getBoss() from workert.ts. When workert.ts (the entry point) dynamically imported incremental-sync.ts, it triggered a circular module re-evaluation that hung indefinitely. Extract the PgBoss singleton and getBoss() factory to a new boss-instance.ts module that neither has top-level async side-effects nor imports from workert.ts. All consumers (workert.ts, index.ts, incremental-sync.ts, cw/sync.ts) now import from boss-instance.ts instead.
This commit is contained in:
@@ -2,7 +2,7 @@ import { createRoute } from "../../modules/api-utils/createRoute";
|
|||||||
import { apiResponse } from "../../modules/api-utils/apiResponse";
|
import { apiResponse } from "../../modules/api-utils/apiResponse";
|
||||||
import { ContentfulStatusCode } from "hono/utils/http-status";
|
import { ContentfulStatusCode } from "hono/utils/http-status";
|
||||||
import { authMiddleware } from "../middleware/authorization";
|
import { authMiddleware } from "../middleware/authorization";
|
||||||
import { getBoss } from "../../workert";
|
import { getBoss } from "../../boss-instance";
|
||||||
import { WorkerQueue } from "../../modules/workers/queues";
|
import { WorkerQueue } from "../../modules/workers/queues";
|
||||||
|
|
||||||
/* POST /v1/cw/sync/full */
|
/* POST /v1/cw/sync/full */
|
||||||
|
|||||||
@@ -0,0 +1,30 @@
|
|||||||
|
/**
|
||||||
|
* Shared PgBoss singleton — kept in its own module to break circular imports
|
||||||
|
* between workert.ts and the worker modules that call getBoss().
|
||||||
|
*/
|
||||||
|
import { PgBoss } from "pg-boss";
|
||||||
|
|
||||||
|
function makePgBossUrl(rawUrl: string): string {
|
||||||
|
try {
|
||||||
|
const u = new URL(rawUrl);
|
||||||
|
// 30-second statement timeout to prevent individual SQL queries from
|
||||||
|
// hanging indefinitely if the DB server stops responding mid-query.
|
||||||
|
u.searchParams.set("options", "-c statement_timeout=30000");
|
||||||
|
return u.toString();
|
||||||
|
} catch {
|
||||||
|
return rawUrl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const boss = new PgBoss({
|
||||||
|
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
|
||||||
|
connectionTimeoutMillis: 15_000,
|
||||||
|
});
|
||||||
|
|
||||||
|
boss.on("error", (err) => {
|
||||||
|
console.error("[worker] PgBoss error", err);
|
||||||
|
});
|
||||||
|
|
||||||
|
export function getBoss(): PgBoss {
|
||||||
|
return boss;
|
||||||
|
}
|
||||||
+2
-1
@@ -6,7 +6,8 @@ import { events } from "./modules/globalEvents";
|
|||||||
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
||||||
import { signPermissions } from "./modules/permission-utils/signPermissions";
|
import { signPermissions } from "./modules/permission-utils/signPermissions";
|
||||||
import { RoleController } from "./controllers/RoleController";
|
import { RoleController } from "./controllers/RoleController";
|
||||||
import { initializeWorkerSystem, getBoss } from "./workert";
|
import { initializeWorkerSystem } from "./workert";
|
||||||
|
import { getBoss } from "./boss-instance";
|
||||||
import { WorkerQueue } from "./modules/workers/queues";
|
import { WorkerQueue } from "./modules/workers/queues";
|
||||||
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
|
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
|
||||||
import { startCommsServer } from "./modules/workers/coms";
|
import { startCommsServer } from "./modules/workers/coms";
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { getBoss } from "../../workert";
|
import { getBoss } from "../../boss-instance";
|
||||||
import { WorkerQueue } from "./queues";
|
import { WorkerQueue } from "./queues";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
+1
-31
@@ -1,29 +1,7 @@
|
|||||||
import { PgBoss } from "pg-boss";
|
|
||||||
import { io, Socket } from "socket.io-client";
|
import { io, Socket } from "socket.io-client";
|
||||||
import { WorkerQueue } from "./modules/workers/queues";
|
import { WorkerQueue } from "./modules/workers/queues";
|
||||||
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
||||||
|
import { boss, getBoss } from "./boss-instance";
|
||||||
// Add statement and connection timeouts to prevent silent hangs
|
|
||||||
function makePgBossUrl(rawUrl: string): string {
|
|
||||||
try {
|
|
||||||
const u = new URL(rawUrl);
|
|
||||||
// 30-second statement timeout and 15-second TCP connect timeout
|
|
||||||
u.searchParams.set("options", "-c statement_timeout=30000");
|
|
||||||
u.searchParams.set("connect_timeout", "15");
|
|
||||||
return u.toString();
|
|
||||||
} catch {
|
|
||||||
return rawUrl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const boss = new PgBoss({
|
|
||||||
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
|
|
||||||
connectionTimeoutMillis: 15_000,
|
|
||||||
});
|
|
||||||
|
|
||||||
boss.on("error", (err) => {
|
|
||||||
console.error("[worker] PgBoss error", err);
|
|
||||||
});
|
|
||||||
|
|
||||||
let bossStartPromise: Promise<void> | null = null;
|
let bossStartPromise: Promise<void> | null = null;
|
||||||
let reservationQueueReady = false;
|
let reservationQueueReady = false;
|
||||||
@@ -160,14 +138,6 @@ export async function initializeWorkerSystem(): Promise<void> {
|
|||||||
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
console.log("[worker] Worker system initialized - ready for job enqueueing");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the PgBoss instance for direct job enqueueing.
|
|
||||||
* Must call initializeWorkerSystem() first.
|
|
||||||
*/
|
|
||||||
export function getBoss(): PgBoss {
|
|
||||||
return boss;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (import.meta.main) {
|
if (import.meta.main) {
|
||||||
// if (Bun.env.NODE_ENV === "development") {
|
// if (Bun.env.NODE_ENV === "development") {
|
||||||
// setupEventDebugger({ processLabel: "WORKER" });
|
// setupEventDebugger({ processLabel: "WORKER" });
|
||||||
|
|||||||
Reference in New Issue
Block a user