146 lines
4.9 KiB
TypeScript
146 lines
4.9 KiB
TypeScript
import app from "./api/server";
|
|
import { setupSockets } from "./api/sockets";
|
|
import { engine, PORT, prisma } from "./constants";
|
|
import { unifiSites } from "./managers/unifiSites";
|
|
import { events } from "./modules/globalEvents";
|
|
import { setupEventDebugger } from "./modules/logging/eventDebugger";
|
|
import { signPermissions } from "./modules/permission-utils/signPermissions";
|
|
import { RoleController } from "./controllers/RoleController";
|
|
import { initializeWorkerSystem, getBoss } from "./workert";
|
|
import { WorkerQueue } from "./modules/workers/queues";
|
|
import { enqueueIncrementalSync } from "./modules/workers/incremental-sync";
|
|
import { startCommsServer } from "./modules/workers/coms";
|
|
import cuid from "cuid";
|
|
|
|
const startupArgs = new Set(Bun.argv.slice(2));
|
|
const simpleTerminalMode =
|
|
startupArgs.has("-st") || startupArgs.has("--simple-terminal");
|
|
|
|
// Setup global event debugger in non-production environments
|
|
if (Bun.env.NODE_ENV == "development" && !simpleTerminalMode) {
|
|
setupEventDebugger({ processLabel: "API" });
|
|
}
|
|
|
|
// Helper to run a startup task safely — failures are logged but never crash the process.
|
|
const safeStartup = async (label: string, fn: () => Promise<void>) => {
|
|
try {
|
|
await fn();
|
|
} catch (err) {
|
|
console.error(`[startup] ${label} failed`, err);
|
|
}
|
|
};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Start the HTTP server FIRST so the pod is reachable immediately.
|
|
// All data-sync tasks run afterwards and are non-blocking.
|
|
// ---------------------------------------------------------------------------
|
|
Bun.serve({
|
|
port: PORT,
|
|
idleTimeout: 255,
|
|
websocket: engine.handler().websocket,
|
|
fetch: (req, server) => {
|
|
const url = new URL(req.url);
|
|
|
|
if (url.pathname.startsWith("/socket.io/")) {
|
|
return engine.handleRequest(req, server as any);
|
|
}
|
|
|
|
return app.fetch(req, server);
|
|
},
|
|
});
|
|
|
|
console.log(`[startup] Server listening on port ${PORT}`);
|
|
|
|
setupSockets();
|
|
console.log("[startup] Socket namespaces initialized");
|
|
|
|
// Initialize worker system (PgBoss connection)
|
|
await safeStartup("initializeWorkerSystem", () => initializeWorkerSystem());
|
|
|
|
// Start the inter-process comms server so the worker can connect on :8671
|
|
startCommsServer();
|
|
console.log("[startup] Comms server listening on :8671");
|
|
|
|
// Enqueue a full dalpuri sync on startup
|
|
await safeStartup("enqueueDalpuriFullSync", async () => {
|
|
const jobId = await getBoss().send(WorkerQueue.DALPURI_FULL_SYNC, {}, { singletonKey: `startup-${Date.now()}` });
|
|
if (jobId) {
|
|
console.log(`[startup] Dalpuri full sync enqueued: ${jobId}`);
|
|
} else {
|
|
console.warn("[startup] Dalpuri full sync send returned null — job may already be pending or PgBoss not ready");
|
|
}
|
|
});
|
|
|
|
// Broadcast incremental sync jobs from the API process every 5s so the
|
|
// interval survives worker restarts.
|
|
setInterval(() => {
|
|
enqueueIncrementalSync().catch((err) =>
|
|
console.error(`[interval] enqueueIncrementalSync failed: ${err?.message ?? err}`)
|
|
);
|
|
}, 5_000);
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Background initialisation — none of this blocks the server.
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// Ensure administrator role exists
|
|
await safeStartup("ensureAdminRole", async () => {
|
|
const existingAdmin = await prisma.role.findFirst({
|
|
where: { moniker: "administrator" },
|
|
include: { users: { include: { roles: true } } },
|
|
});
|
|
|
|
if (!existingAdmin) {
|
|
const id = cuid();
|
|
const created = await prisma.role.create({
|
|
data: {
|
|
id,
|
|
moniker: "administrator",
|
|
title: "Admin",
|
|
permissions: signPermissions({
|
|
issuer: "roles",
|
|
subject: id,
|
|
permissions: ["*"],
|
|
}),
|
|
},
|
|
include: { users: { include: { roles: true } } },
|
|
});
|
|
events.emit("role:created", new RoleController(created));
|
|
}
|
|
});
|
|
|
|
// Enqueue an initial cold-load metrics refresh on startup
|
|
await safeStartup("enqueueSalesMetricsRefresh", async () => {
|
|
const jobId = await getBoss().send(
|
|
WorkerQueue.REFRESH_SALES_METRICS,
|
|
{ forceColdLoad: true },
|
|
{ singletonKey: `startup-metrics-${Date.now()}` }
|
|
);
|
|
if (jobId) {
|
|
console.log(`[startup] Sales metrics refresh enqueued: ${jobId}`);
|
|
} else {
|
|
console.warn("[startup] Sales metrics refresh send returned null — job may already be pending");
|
|
}
|
|
});
|
|
|
|
// Enqueue a metrics refresh every 5 minutes
|
|
setInterval(() => {
|
|
getBoss()
|
|
.send(WorkerQueue.REFRESH_SALES_METRICS, {}, { singletonKey: "metrics-interval" })
|
|
.catch((err) =>
|
|
console.error(`[interval] REFRESH_SALES_METRICS enqueue failed: ${err?.message ?? err}`)
|
|
);
|
|
}, 5 * 60 * 1000);
|
|
|
|
// Sync UniFi sites
|
|
await safeStartup("syncSites", async () => {
|
|
await unifiSites.syncSites();
|
|
});
|
|
setInterval(() => {
|
|
return unifiSites
|
|
.syncSites()
|
|
.catch((err) =>
|
|
console.error(`[interval] syncSites failed: ${err?.message ?? err}`)
|
|
);
|
|
}, 60 * 1000);
|