diff --git a/api/src/api/cw/sync.ts b/api/src/api/cw/sync.ts index 49a474d..cc63bb2 100644 --- a/api/src/api/cw/sync.ts +++ b/api/src/api/cw/sync.ts @@ -2,7 +2,7 @@ import { createRoute } from "../../modules/api-utils/createRoute"; import { apiResponse } from "../../modules/api-utils/apiResponse"; import { ContentfulStatusCode } from "hono/utils/http-status"; import { authMiddleware } from "../middleware/authorization"; -import { getBoss } from "../../workert"; +import { getBoss } from "../../boss-instance"; import { WorkerQueue } from "../../modules/workers/queues"; /* POST /v1/cw/sync/full */ diff --git a/api/src/boss-instance.ts b/api/src/boss-instance.ts new file mode 100644 index 0000000..c431a4b --- /dev/null +++ b/api/src/boss-instance.ts @@ -0,0 +1,30 @@ +/** + * Shared PgBoss singleton — kept in its own module to break circular imports + * between workert.ts and the worker modules that call getBoss(). + */ +import { PgBoss } from "pg-boss"; + +function makePgBossUrl(rawUrl: string): string { + try { + const u = new URL(rawUrl); + // 30-second statement timeout to prevent individual SQL queries from + // hanging indefinitely if the DB server stops responding mid-query. + u.searchParams.set("options", "-c statement_timeout=30000"); + return u.toString(); + } catch { + return rawUrl; + } +} + +export const boss = new PgBoss({ + connectionString: makePgBossUrl(process.env.DATABASE_URL!), + connectionTimeoutMillis: 15_000, +}); + +boss.on("error", (err) => { + console.error("[worker] PgBoss error", err); +}); + +export function getBoss(): PgBoss { + return boss; +} diff --git a/api/src/index.ts b/api/src/index.ts index 07e2d67..5093921 100644 --- a/api/src/index.ts +++ b/api/src/index.ts @@ -6,7 +6,8 @@ import { events } from "./modules/globalEvents"; import { setupEventDebugger } from "./modules/logging/eventDebugger"; import { signPermissions } from "./modules/permission-utils/signPermissions"; import { RoleController } from "./controllers/RoleController"; -import { initializeWorkerSystem, getBoss } from "./workert"; +import { initializeWorkerSystem } from "./workert"; +import { getBoss } from "./boss-instance"; import { WorkerQueue } from "./modules/workers/queues"; import { enqueueIncrementalSync } from "./modules/workers/incremental-sync"; import { startCommsServer } from "./modules/workers/coms"; diff --git a/api/src/modules/workers/incremental-sync.ts b/api/src/modules/workers/incremental-sync.ts index bd7533e..df64440 100644 --- a/api/src/modules/workers/incremental-sync.ts +++ b/api/src/modules/workers/incremental-sync.ts @@ -1,4 +1,4 @@ -import { getBoss } from "../../workert"; +import { getBoss } from "../../boss-instance"; import { WorkerQueue } from "./queues"; /** diff --git a/api/src/workert.ts b/api/src/workert.ts index 3035633..d99e0b9 100644 --- a/api/src/workert.ts +++ b/api/src/workert.ts @@ -1,29 +1,7 @@ -import { PgBoss } from "pg-boss"; import { io, Socket } from "socket.io-client"; import { WorkerQueue } from "./modules/workers/queues"; import { setupEventDebugger } from "./modules/logging/eventDebugger"; - -// Add statement and connection timeouts to prevent silent hangs -function makePgBossUrl(rawUrl: string): string { - try { - const u = new URL(rawUrl); - // 30-second statement timeout and 15-second TCP connect timeout - u.searchParams.set("options", "-c statement_timeout=30000"); - u.searchParams.set("connect_timeout", "15"); - return u.toString(); - } catch { - return rawUrl; - } -} - -const boss = new PgBoss({ - connectionString: makePgBossUrl(process.env.DATABASE_URL!), - connectionTimeoutMillis: 15_000, -}); - -boss.on("error", (err) => { - console.error("[worker] PgBoss error", err); -}); +import { boss, getBoss } from "./boss-instance"; let bossStartPromise: Promise | null = null; let reservationQueueReady = false; @@ -160,14 +138,6 @@ export async function initializeWorkerSystem(): Promise { console.log("[worker] Worker system initialized - ready for job enqueueing"); } -/** - * Get the PgBoss instance for direct job enqueueing. - * Must call initializeWorkerSystem() first. - */ -export function getBoss(): PgBoss { - return boss; -} - if (import.meta.main) { // if (Bun.env.NODE_ENV === "development") { // setupEventDebugger({ processLabel: "WORKER" });