fix(worker): add PgBoss startup timeouts and debug logging

- Add statement_timeout=30000ms to PgBoss connection URL to prevent
  SQL queries from hanging indefinitely
- Add connectionTimeoutMillis=15s to PgBoss config for connection timeout
- Wrap boss.start() in 30s Promise.race timeout with process.exit(1)
  on failure to ensure container restarts instead of hanging silently
- Add debug logging around PgBoss startup to diagnose connection issues
This commit is contained in:
2026-04-13 23:53:32 +00:00
parent 5f5f610060
commit 7f6e6fdfbc
+31 -2
View File
@@ -3,7 +3,23 @@ import { io, Socket } from "socket.io-client";
import { WorkerQueue } from "./modules/workers/queues";
import { setupEventDebugger } from "./modules/logging/eventDebugger";
const boss = new PgBoss(process.env.DATABASE_URL!);
// Add statement and connection timeouts to prevent silent hangs
function makePgBossUrl(rawUrl: string): string {
try {
const u = new URL(rawUrl);
// 30-second statement timeout and 15-second TCP connect timeout
u.searchParams.set("options", "-c statement_timeout=30000");
u.searchParams.set("connect_timeout", "15");
return u.toString();
} catch {
return rawUrl;
}
}
const boss = new PgBoss({
connectionString: makePgBossUrl(process.env.DATABASE_URL!),
connectionTimeoutMillis: 15_000,
});
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
@@ -155,9 +171,22 @@ if (import.meta.main) {
console.log(
`[worker] Connecting to PgBoss on DATABASE_URL and SocketIO on ${process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671"}`
);
console.log(`[worker] DATABASE_URL set: ${!!process.env.DATABASE_URL}`);
// Ensure PgBoss is connected and queues exist
await ensureBossStarted();
console.log("[worker] Starting PgBoss...");
try {
await Promise.race([
ensureBossStarted(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("boss.start() timed out after 30s")), 30_000)
),
]);
} catch (err) {
console.error("[worker] FATAL: PgBoss failed to start:", err);
process.exit(1);
}
console.log("[worker] PgBoss started successfully");
await ensureDalpuriSyncQueue();
// Register job handler for DALPURI_FULL_SYNC