import app from "./api/server"; import { setupSockets } from "./api/sockets"; import { engine, PORT, prisma } from "./constants"; import { unifiSites } from "./managers/unifiSites"; import { events } from "./modules/globalEvents"; import { setupEventDebugger } from "./modules/logging/eventDebugger"; import { signPermissions } from "./modules/permission-utils/signPermissions"; import { RoleController } from "./controllers/RoleController"; import { initializeWorkerSystem, getBoss } from "./workert"; import { WorkerQueue } from "./modules/workers/queues"; import { enqueueIncrementalSync } from "./modules/workers/incremental-sync"; import { startCommsServer } from "./modules/workers/coms"; import cuid from "cuid"; const startupArgs = new Set(Bun.argv.slice(2)); const simpleTerminalMode = startupArgs.has("-st") || startupArgs.has("--simple-terminal"); // Setup global event debugger in non-production environments if (Bun.env.NODE_ENV == "development" && !simpleTerminalMode) { setupEventDebugger({ processLabel: "API" }); } // Helper to run a startup task safely — failures are logged but never crash the process. const safeStartup = async (label: string, fn: () => Promise) => { try { await fn(); } catch (err) { console.error(`[startup] ${label} failed`, err); } }; // --------------------------------------------------------------------------- // Start the HTTP server FIRST so the pod is reachable immediately. // All data-sync tasks run afterwards and are non-blocking. // --------------------------------------------------------------------------- Bun.serve({ port: PORT, idleTimeout: 255, websocket: engine.handler().websocket, fetch: (req, server) => { const url = new URL(req.url); if (url.pathname.startsWith("/socket.io/")) { return engine.handleRequest(req, server as any); } return app.fetch(req, server); }, }); console.log(`[startup] Server listening on port ${PORT}`); setupSockets(); console.log("[startup] Socket namespaces initialized"); // Initialize worker system (PgBoss connection) await safeStartup("initializeWorkerSystem", () => initializeWorkerSystem()); // Start the inter-process comms server so the worker can connect on :8671 startCommsServer(); console.log("[startup] Comms server listening on :8671"); // Enqueue a full dalpuri sync on startup await safeStartup("enqueueDalpuriFullSync", async () => { const jobId = await getBoss().send(WorkerQueue.DALPURI_FULL_SYNC, {}, { singletonKey: `startup-${Date.now()}` }); if (jobId) { console.log(`[startup] Dalpuri full sync enqueued: ${jobId}`); } else { console.warn("[startup] Dalpuri full sync send returned null — job may already be pending or PgBoss not ready"); } }); // Broadcast incremental sync jobs from the API process every 5s so the // interval survives worker restarts. setInterval(() => { enqueueIncrementalSync().catch((err) => console.error(`[interval] enqueueIncrementalSync failed: ${err?.message ?? err}`) ); }, 5_000); // --------------------------------------------------------------------------- // Background initialisation — none of this blocks the server. // --------------------------------------------------------------------------- // Ensure administrator role exists await safeStartup("ensureAdminRole", async () => { const existingAdmin = await prisma.role.findFirst({ where: { moniker: "administrator" }, include: { users: { include: { roles: true } } }, }); if (!existingAdmin) { const id = cuid(); const created = await prisma.role.create({ data: { id, moniker: "administrator", title: "Admin", permissions: signPermissions({ issuer: "roles", subject: id, permissions: ["*"], }), }, include: { users: { include: { roles: true } } }, }); events.emit("role:created", new RoleController(created)); } }); // Enqueue an initial cold-load metrics refresh on startup await safeStartup("enqueueSalesMetricsRefresh", async () => { const jobId = await getBoss().send( WorkerQueue.REFRESH_SALES_METRICS, { forceColdLoad: true }, { singletonKey: `startup-metrics-${Date.now()}` } ); if (jobId) { console.log(`[startup] Sales metrics refresh enqueued: ${jobId}`); } else { console.warn("[startup] Sales metrics refresh send returned null — job may already be pending"); } }); // Enqueue a metrics refresh every 5 minutes setInterval(() => { getBoss() .send(WorkerQueue.REFRESH_SALES_METRICS, {}, { singletonKey: "metrics-interval" }) .catch((err) => console.error(`[interval] REFRESH_SALES_METRICS enqueue failed: ${err?.message ?? err}`) ); }, 5 * 60 * 1000); // Sync UniFi sites await safeStartup("syncSites", async () => { await unifiSites.syncSites(); }); setInterval(() => { return unifiSites .syncSites() .catch((err) => console.error(`[interval] syncSites failed: ${err?.message ?? err}`) ); }, 60 * 1000);