fix: remove nested .git folders, re-add as normal directories

This commit is contained in:
2026-03-22 17:50:47 -05:00
parent f55c7e47c9
commit 6b7eec67b8
1870 changed files with 4170168 additions and 3 deletions
+175
View File
@@ -0,0 +1,175 @@
import { PgBoss } from "pg-boss";
import { io, Socket } from "socket.io-client";
import { WorkerQueue } from "./modules/workers/queues";
import { refreshActiveOpportunitiesWorker } from "./modules/workers/cache/refreshActiveOpportunities";
import { refreshOpportunities } from "./modules/cw-utils/opportunities/refreshOpportunities";
import { opportunityCw } from "./modules/cw-utils/opportunities/opportunities";
import { setupEventDebugger } from "./modules/logging/eventDebugger";
const boss = new PgBoss(process.env.DATABASE_URL!);
boss.on("error", (err) => {
console.error("[worker] PgBoss error", err);
});
let bossStartPromise: Promise<void> | null = null;
let reservationQueueReady = false;
let opportunityWorkersRegistered = false;
let managerSocket: Socket | null = null;
let managerSocketReadyPromise: Promise<Socket> | null = null;
async function ensureBossStarted(): Promise<void> {
if (bossStartPromise) return bossStartPromise;
bossStartPromise = boss.start().then(() => undefined);
await bossStartPromise;
}
async function ensureReservationQueue(): Promise<void> {
if (reservationQueueReady) return;
try {
await boss.createQueue(WorkerQueue.WORKER_NAMESPACE_RESERVATION);
} catch {
// Queue may already exist; ignore to keep this idempotent.
}
reservationQueueReady = true;
}
function ensureManagerSocketReady(): Promise<Socket> {
if (managerSocket && managerSocket.connected)
return Promise.resolve(managerSocket);
if (managerSocketReadyPromise) return managerSocketReadyPromise;
managerSocket = io("http://localhost:8671", {
reconnection: true,
});
managerSocket.on("connect", () => {
console.log("[worker] Connected to manager socket on :8671");
});
managerSocket.on("disconnect", (reason) => {
console.warn(`[worker] Manager socket disconnected: ${reason}`);
});
managerSocket.on("connect_error", (err) => {
console.error("[worker] Manager socket connect_error", err.message);
});
managerSocketReadyPromise = new Promise<Socket>((resolve, reject) => {
const socket = managerSocket!;
const onConnect = () => {
socket.off("connect_error", onConnectError);
resolve(socket);
};
const onConnectError = (err: Error) => {
socket.off("connect", onConnect);
managerSocketReadyPromise = null;
reject(err);
};
socket.once("connect", onConnect);
socket.once("connect_error", onConnectError);
});
return managerSocketReadyPromise;
}
/**
* Ask PgBoss to generate a durable job ID we can reuse as a worker namespace ID.
*
* A short-lived reservation job is created and immediately canceled to avoid
* growing the queue while still obtaining a PgBoss-generated ID.
*/
export async function reserveWorkerId(queueType: WorkerQueue): Promise<string> {
await ensureBossStarted();
await ensureReservationQueue();
const workerId = await boss.send(WorkerQueue.WORKER_NAMESPACE_RESERVATION, {
queueType,
requestedAt: new Date().toISOString(),
});
if (!workerId) {
throw new Error("Failed to reserve PgBoss job ID for worker namespace");
}
try {
await boss.cancel(WorkerQueue.WORKER_NAMESPACE_RESERVATION, workerId);
} catch {
// Best-effort cleanup only.
}
return workerId;
}
async function ensureOpportunityQueues(): Promise<void> {
try {
await boss.createQueue(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES);
} catch {
// Queue may already exist; ignore.
}
}
export async function startOpportunityCacheWorkers(): Promise<void> {
if (opportunityWorkersRegistered) return;
await ensureBossStarted();
await ensureReservationQueue();
await ensureOpportunityQueues();
boss.work(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES, async () => {
const socket = await ensureManagerSocketReady();
await refreshActiveOpportunitiesWorker(socket, {
runFullRefresh: () =>
refreshOpportunities({
collectorFetch: () =>
opportunityCw.fetchAllOpportunitiesFromCollector(),
}),
});
});
opportunityWorkersRegistered = true;
}
export async function enqueueActiveOpportunityRefreshJob(): Promise<string> {
await ensureBossStarted();
await ensureOpportunityQueues();
const jobId = await boss.send(WorkerQueue.REFRESH_ACTIVE_OPPORTUNITIES, {
enqueuedAt: new Date().toISOString(),
});
if (!jobId) {
throw new Error("Failed to enqueue active opportunity refresh job");
}
return jobId;
}
export async function enqueueArchivedOpportunityRefreshJob(
force = false,
): Promise<string> {
console.warn(
`[worker] enqueueArchivedOpportunityRefreshJob(force=${force}) is deprecated; enqueueing unified active refresh job instead`,
);
return enqueueActiveOpportunityRefreshJob();
}
if (import.meta.main) {
if (Bun.env.NODE_ENV === "development") {
setupEventDebugger({ processLabel: "WORKER" });
}
startOpportunityCacheWorkers()
.then(() => {
console.log("[worker] Opportunity cache workers started");
})
.catch((err) => {
console.error("[worker] Failed to start opportunity cache workers", err);
process.exit(1);
});
}