diff --git a/api/src/workert.ts b/api/src/workert.ts index dccef23..81dfc67 100644 --- a/api/src/workert.ts +++ b/api/src/workert.ts @@ -3,7 +3,23 @@ import { io, Socket } from "socket.io-client"; import { WorkerQueue } from "./modules/workers/queues"; import { setupEventDebugger } from "./modules/logging/eventDebugger"; -const boss = new PgBoss(process.env.DATABASE_URL!); +// Add statement and connection timeouts to prevent silent hangs +function makePgBossUrl(rawUrl: string): string { + try { + const u = new URL(rawUrl); + // 30-second statement timeout and 15-second TCP connect timeout + u.searchParams.set("options", "-c statement_timeout=30000"); + u.searchParams.set("connect_timeout", "15"); + return u.toString(); + } catch { + return rawUrl; + } +} + +const boss = new PgBoss({ + connectionString: makePgBossUrl(process.env.DATABASE_URL!), + connectionTimeoutMillis: 15_000, +}); boss.on("error", (err) => { console.error("[worker] PgBoss error", err); @@ -155,9 +171,22 @@ if (import.meta.main) { console.log( `[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}` ); + console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`); // Ensure PgBoss is connected and queues exist - await ensureBossStarted(); + console.log("[worker] Starting PgBoss..."); + try { + await Promise.race([ + ensureBossStarted(), + new Promise((_, reject) => + setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000) + ), + ]); + } catch (err) { + console.error("[worker] FATAL: PgBoss failed to start:", err); + process.exit(1); + } + console.log("[worker] PgBoss started successfully"); await ensureDalpuriSyncQueue(); // Register job handler for DALPURI_FULL_SYNC