fix(worker): restore reliable 5s incremental sync cadence
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import { Server } from "socket.io";
|
||||
import { events, EventTypes } from "../globalEvents";
|
||||
import { WorkerQueue } from "./queues";
|
||||
import { reserveWorkerId } from "../../workert";
|
||||
|
||||
function emitGlobalEvent<K extends keyof EventTypes>(
|
||||
name: K,
|
||||
|
||||
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
|
||||
* Called on an interval from the main API process so it survives worker restarts.
|
||||
*/
|
||||
export async function enqueueIncrementalSync(): Promise<void> {
|
||||
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
|
||||
const jobId = await getBoss().send(
|
||||
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
|
||||
{
|
||||
enqueuedAt: new Date().toISOString(),
|
||||
},
|
||||
{
|
||||
singletonKey: "dalpuri-incremental-sync",
|
||||
}
|
||||
);
|
||||
|
||||
if (!jobId) {
|
||||
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
|
||||
queueType: WorkerQueue,
|
||||
workFn: (workerSocket: Socket) => Promise<T>,
|
||||
): Promise<T> {
|
||||
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
// Request a worker ID and namespace from the manager
|
||||
socket.emit(
|
||||
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
|
||||
}
|
||||
|
||||
// Connect to the worker-specific namespace
|
||||
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
|
||||
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
|
||||
reconnection: false,
|
||||
});
|
||||
|
||||
|
||||
+29
-1
@@ -163,6 +163,7 @@ if (import.meta.main) {
|
||||
// Register job handler for DALPURI_FULL_SYNC
|
||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||
const socket = await ensureManagerSocketReady();
|
||||
await enqueueDalpuriFullSync(socket);
|
||||
@@ -170,10 +171,37 @@ if (import.meta.main) {
|
||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||
|
||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||
await executeIncrementalSync();
|
||||
const startedAt = Date.now();
|
||||
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
||||
try {
|
||||
await executeIncrementalSync();
|
||||
console.log(
|
||||
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
||||
);
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
||||
err
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||
|
||||
const enqueueIncrementalWithLogging = () => {
|
||||
enqueueIncrementalSync().catch((err) => {
|
||||
console.error(
|
||||
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
||||
// API interval scheduling is unavailable.
|
||||
enqueueIncrementalWithLogging();
|
||||
setInterval(enqueueIncrementalWithLogging, 5_000);
|
||||
console.log("[worker] Started 5-second incremental enqueue interval");
|
||||
|
||||
// Register job handler for REFRESH_SALES_METRICS
|
||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||
|
||||
Reference in New Issue
Block a user