fix(worker): restore reliable 5s incremental sync cadence
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import { Server } from "socket.io";
|
import { Server } from "socket.io";
|
||||||
import { events, EventTypes } from "../globalEvents";
|
import { events, EventTypes } from "../globalEvents";
|
||||||
import { WorkerQueue } from "./queues";
|
import { WorkerQueue } from "./queues";
|
||||||
|
import { reserveWorkerId } from "../../workert";
|
||||||
|
|
||||||
function emitGlobalEvent<K extends keyof EventTypes>(
|
function emitGlobalEvent<K extends keyof EventTypes>(
|
||||||
name: K,
|
name: K,
|
||||||
|
|||||||
@@ -6,5 +6,17 @@ import { WorkerQueue } from "./queues";
|
|||||||
* Called on an interval from the main API process so it survives worker restarts.
|
* Called on an interval from the main API process so it survives worker restarts.
|
||||||
*/
|
*/
|
||||||
export async function enqueueIncrementalSync(): Promise<void> {
|
export async function enqueueIncrementalSync(): Promise<void> {
|
||||||
await getBoss().send(WorkerQueue.DALPURI_INCREMENTAL_SYNC, {});
|
const jobId = await getBoss().send(
|
||||||
|
WorkerQueue.DALPURI_INCREMENTAL_SYNC,
|
||||||
|
{
|
||||||
|
enqueuedAt: new Date().toISOString(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
singletonKey: "dalpuri-incremental-sync",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!jobId) {
|
||||||
|
console.debug("[interval] DALPURI_INCREMENTAL_SYNC already pending or active");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ export async function createWorkerJob<T>(
|
|||||||
queueType: WorkerQueue,
|
queueType: WorkerQueue,
|
||||||
workFn: (workerSocket: Socket) => Promise<T>,
|
workFn: (workerSocket: Socket) => Promise<T>,
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
|
const managerUrl = process.env.MANAGER_SOCKET_URL ?? "http://localhost:8671";
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
// Request a worker ID and namespace from the manager
|
// Request a worker ID and namespace from the manager
|
||||||
socket.emit(
|
socket.emit(
|
||||||
@@ -53,7 +55,7 @@ export async function createWorkerJob<T>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Connect to the worker-specific namespace
|
// Connect to the worker-specific namespace
|
||||||
const workerSocket = io(`http://localhost:8671/worker-${workerId}`, {
|
const workerSocket = io(`${managerUrl}/worker-${workerId}`, {
|
||||||
reconnection: false,
|
reconnection: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -163,6 +163,7 @@ if (import.meta.main) {
|
|||||||
// Register job handler for DALPURI_FULL_SYNC
|
// Register job handler for DALPURI_FULL_SYNC
|
||||||
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager");
|
||||||
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync");
|
||||||
|
const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync");
|
||||||
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => {
|
||||||
const socket = await ensureManagerSocketReady();
|
const socket = await ensureManagerSocketReady();
|
||||||
await enqueueDalpuriFullSync(socket);
|
await enqueueDalpuriFullSync(socket);
|
||||||
@@ -170,10 +171,37 @@ if (import.meta.main) {
|
|||||||
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_FULL_SYNC job handler");
|
||||||
|
|
||||||
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
await boss.work(WorkerQueue.DALPURI_INCREMENTAL_SYNC, async () => {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
console.log("[worker] DALPURI_INCREMENTAL_SYNC started");
|
||||||
|
try {
|
||||||
await executeIncrementalSync();
|
await executeIncrementalSync();
|
||||||
|
console.log(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC completed in ${Date.now() - startedAt}ms`
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(
|
||||||
|
`[worker] DALPURI_INCREMENTAL_SYNC failed in ${Date.now() - startedAt}ms`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
console.log("[worker] Registered DALPURI_INCREMENTAL_SYNC job handler");
|
||||||
|
|
||||||
|
const enqueueIncrementalWithLogging = () => {
|
||||||
|
enqueueIncrementalSync().catch((err) => {
|
||||||
|
console.error(
|
||||||
|
`[worker] interval enqueueIncrementalSync failed: ${err?.message ?? err}`
|
||||||
|
);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
// Keep a worker-local 5s scheduler so incremental sync continues even when
|
||||||
|
// API interval scheduling is unavailable.
|
||||||
|
enqueueIncrementalWithLogging();
|
||||||
|
setInterval(enqueueIncrementalWithLogging, 5_000);
|
||||||
|
console.log("[worker] Started 5-second incremental enqueue interval");
|
||||||
|
|
||||||
// Register job handler for REFRESH_SALES_METRICS
|
// Register job handler for REFRESH_SALES_METRICS
|
||||||
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
const { executeSalesMetricsRefresh } = await import("./modules/workers/sales-metrics");
|
||||||
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
await boss.work(WorkerQueue.REFRESH_SALES_METRICS, async (jobs) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user