diff --git a/api/src/workert.ts b/api/src/workert.ts index 81dfc67..3035633 100644 --- a/api/src/workert.ts +++ b/api/src/workert.ts @@ -127,19 +127,25 @@ export async function reserveWorkerId(queueType: WorkerQueue): Promise { async function ensureDalpuriSyncQueue(): Promise { try { + console.log("[worker] Creating DALPURI_FULL_SYNC queue..."); await boss.createQueue(WorkerQueue.DALPURI_FULL_SYNC); - } catch { - // Queue may already exist; ignore to keep this idempotent. + console.log("[worker] DALPURI_FULL_SYNC queue ready"); + } catch (err) { + console.log("[worker] DALPURI_FULL_SYNC queue already exists (or error):", (err as Error).message); } try { + console.log("[worker] Creating DALPURI_INCREMENTAL_SYNC queue..."); await boss.createQueue(WorkerQueue.DALPURI_INCREMENTAL_SYNC); - } catch { - // Queue may already exist; ignore to keep this idempotent. + console.log("[worker] DALPURI_INCREMENTAL_SYNC queue ready"); + } catch (err) { + console.log("[worker] DALPURI_INCREMENTAL_SYNC queue already exists (or error):", (err as Error).message); } try { + console.log("[worker] Creating REFRESH_SALES_METRICS queue..."); await boss.createQueue(WorkerQueue.REFRESH_SALES_METRICS); - } catch { - // Queue may already exist; ignore to keep this idempotent. + console.log("[worker] REFRESH_SALES_METRICS queue ready"); + } catch (err) { + console.log("[worker] REFRESH_SALES_METRICS queue already exists (or error):", (err as Error).message); } } @@ -187,11 +193,16 @@ if (import.meta.main) { process.exit(1); } console.log("[worker] PgBoss started successfully"); + console.log("[worker] Ensuring sync queues..."); await ensureDalpuriSyncQueue(); + console.log("[worker] Sync queues ready"); // Register job handler for DALPURI_FULL_SYNC + console.log("[worker] Importing sync-manager..."); const { enqueueDalpuriFullSync } = await import("./modules/workers/sync-manager"); + console.log("[worker] Importing dalpuri-sync..."); const { executeIncrementalSync } = await import("./modules/workers/dalpuri-sync"); + console.log("[worker] Importing incremental-sync..."); const { enqueueIncrementalSync } = await import("./modules/workers/incremental-sync"); await boss.work(WorkerQueue.DALPURI_FULL_SYNC, async () => { const socket = await ensureManagerSocketReady();