/** * @module opportunityCache * * Redis-backed cache for expensive ConnectWise API data associated * with opportunities. * * ## What is cached * * Each non-closed opportunity may have two cached payloads keyed by * its `cwOpportunityId`: * * - **Activities** (`opp:activities:{cwOpportunityId}`) — the raw * `CWActivity[]` array fetched from `activityCw.fetchByOpportunity()`. * - **Company CW data** (`opp:company-cw:{cw_CompanyId}`) — the hydrated * company / contacts blob set by `CompanyController.hydrateCwData()`. * - **Notes** (`opp:notes:{cwOpportunityId}`) — raw CW notes array. * - **Contacts** (`opp:contacts:{cwOpportunityId}`) — raw CW contacts array. * - **Products** (`opp:products:{cwOpportunityId}`) — raw CW forecast + * procurement products blob. * * TTLs are computed dynamically via {@link computeCacheTTL} so hot * opportunities refresh every 30 s while stale ones live for 15 min. * * ## Background refresh * * {@link refreshOpportunityCache} is designed to be called on a * 30-second interval from `src/index.ts`. It scans all non-closed * DB opportunities, checks which cache keys have expired, and * re-fetches only those from ConnectWise. */ import { prisma, redis } from "../../constants"; import { activityCw } from "../cw-utils/activities/activities"; import { computeCacheTTL } from "../algorithms/computeCacheTTL"; import { computeSubResourceCacheTTL } from "../algorithms/computeSubResourceCacheTTL"; import { computeProductsCacheTTL, PRODUCTS_TTL_HOT, } from "../algorithms/computeProductsCacheTTL"; import { connectWiseApi } from "../../constants"; import { fetchCwCompanyById } from "../cw-utils/fetchCompany"; import { fetchCompanySite } from "../cw-utils/sites/companySites"; import { opportunityCw } from "../cw-utils/opportunities/opportunities"; import { withCwRetry } from "../cw-utils/withCwRetry"; import { events } from "../globalEvents"; // --------------------------------------------------------------------------- // Key helpers // --------------------------------------------------------------------------- const ACTIVITY_PREFIX = "opp:activities:"; const COMPANY_CW_PREFIX = "opp:company-cw:"; const NOTES_PREFIX = "opp:notes:"; const CONTACTS_PREFIX = "opp:contacts:"; const PRODUCTS_PREFIX = "opp:products:"; const SITE_PREFIX = "opp:site:"; const OPP_CW_PREFIX = "opp:cw-data:"; /** Redis key for cached activities by CW opportunity ID. */ export const activityCacheKey = (cwOppId: number) => `${ACTIVITY_PREFIX}${cwOppId}`; /** Redis key for cached company CW hydration data by CW company ID. */ export const companyCwCacheKey = (cwCompanyId: number) => `${COMPANY_CW_PREFIX}${cwCompanyId}`; /** Redis key for cached opportunity notes by CW opportunity ID. */ export const notesCacheKey = (cwOppId: number) => `${NOTES_PREFIX}${cwOppId}`; /** Redis key for cached opportunity contacts by CW opportunity ID. */ export const contactsCacheKey = (cwOppId: number) => `${CONTACTS_PREFIX}${cwOppId}`; /** Redis key for cached opportunity products by CW opportunity ID. */ export const productsCacheKey = (cwOppId: number) => `${PRODUCTS_PREFIX}${cwOppId}`; /** Redis key for cached company site by CW company ID + site ID. */ export const siteCacheKey = (cwCompanyId: number, cwSiteId: number) => `${SITE_PREFIX}${cwCompanyId}:${cwSiteId}`; /** Redis key for cached CW opportunity response by CW opportunity ID. */ export const oppCwDataCacheKey = (cwOppId: number) => `${OPP_CW_PREFIX}${cwOppId}`; // --------------------------------------------------------------------------- // Read helpers // --------------------------------------------------------------------------- /** * Retrieve cached CW activities for an opportunity. * * @returns The parsed `CWActivity[]` or `null` on cache miss. */ export async function getCachedActivities( cwOpportunityId: number, ): Promise { const raw = await redis.get(activityCacheKey(cwOpportunityId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached company CW hydration data. * * @returns `{ company, defaultContact, allContacts }` or `null` on cache miss. */ export async function getCachedCompanyCwData( cwCompanyId: number, ): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> { const raw = await redis.get(companyCwCacheKey(cwCompanyId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached opportunity notes (raw CW data). * * @returns The parsed raw CW notes array or `null` on cache miss. */ export async function getCachedNotes( cwOpportunityId: number, ): Promise { const raw = await redis.get(notesCacheKey(cwOpportunityId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached opportunity contacts (raw CW data). * * @returns The parsed raw CW contacts array or `null` on cache miss. */ export async function getCachedContacts( cwOpportunityId: number, ): Promise { const raw = await redis.get(contactsCacheKey(cwOpportunityId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached opportunity products (raw CW forecast + procurement blob). * * @returns `{ forecast, procProducts }` or `null` on cache miss. */ export async function getCachedProducts( cwOpportunityId: number, ): Promise<{ forecast: any; procProducts: any[] } | null> { const raw = await redis.get(productsCacheKey(cwOpportunityId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached CW site data for a company/site pair. * * @returns Parsed site data or `null` on cache miss. */ export async function getCachedSite( cwCompanyId: number, cwSiteId: number, ): Promise { const raw = await redis.get(siteCacheKey(cwCompanyId, cwSiteId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } /** * Retrieve cached CW opportunity response data. * * @returns Parsed CW opportunity object or `null` on cache miss. */ export async function getCachedOppCwData( cwOpportunityId: number, ): Promise { const raw = await redis.get(oppCwDataCacheKey(cwOpportunityId)); if (!raw) return null; try { return JSON.parse(raw); } catch { return null; } } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** Check whether an error is an Axios 404 (resource not found in CW). */ function isNotFoundError(err: unknown): boolean { if (typeof err !== "object" || err === null) return false; const e = err as Record; return e.isAxiosError === true && e.response?.status === 404; } /** * Check whether an error is a transient network / timeout error. * * These are safe to swallow in background refresh tasks — CW will be * retried on the next refresh cycle. Logs a concise one-line warning * instead of dumping the full Axios error object. */ function isTransientError(err: unknown): boolean { if (typeof err !== "object" || err === null) return false; const e = err as Record; if (!e.isAxiosError) return false; const code = e.code as string | undefined; return ( code === "ECONNABORTED" || code === "ECONNREFUSED" || code === "ECONNRESET" || code === "ETIMEDOUT" || code === "ERR_NETWORK" || code === "ENETUNREACH" || code === "ERR_BAD_RESPONSE" ); } /** Build a concise error description for logging (avoids dumping entire Axios objects). */ function describeError(err: unknown): string { if (typeof err !== "object" || err === null) return String(err); const e = err as Record; if (e.isAxiosError) { const method = (e.config?.method ?? "?").toUpperCase(); const url = e.config?.url ?? "unknown"; const code = e.code ?? ""; const status = e.response?.status ?? ""; return `${method} ${url} → ${code || `HTTP ${status}`} (${e.message})`; } return e.message ?? String(err); } /** * When true, transient-error warnings inside fetchAndCache* are suppressed. * Used during background refresh to avoid flooding the terminal — the * refresh function prints a single summary line instead. */ let _suppressTransientWarnings = false; // --------------------------------------------------------------------------- // Write helpers // --------------------------------------------------------------------------- /** * Fetch activities from CW and cache them with the appropriate TTL. * * Returns an empty array if CW responds with 404 (opportunity doesn't * exist or was deleted upstream). * * @returns The raw `CWActivity[]` collection (as plain array). */ export async function fetchAndCacheActivities( cwOpportunityId: number, ttlMs: number, ): Promise { try { // Use the direct (single-call) variant to avoid the extra count request const arr = await activityCw.fetchByOpportunityDirect(cwOpportunityId); await redis.set( activityCacheKey(cwOpportunityId), JSON.stringify(arr), "PX", ttlMs, ); return arr; } catch (err) { if (isNotFoundError(err)) return []; if (isTransientError(err)) { console.warn( `[cache] activities opp#${cwOpportunityId}: ${describeError(err)}`, ); return []; } throw err; } } /** * Fetch company CW data (company, contacts) and cache with the given TTL. * * @returns The hydration blob or `null` if the company doesn't exist in CW. */ export async function fetchAndCacheCompanyCwData( cwCompanyId: number, ttlMs: number, ): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> { try { // Fetch company and all-contacts in parallel — the allContacts URL // can be constructed directly without the company response. const [cwCompany, allContactsData] = await Promise.all([ fetchCwCompanyById(cwCompanyId), withCwRetry( () => connectWiseApi.get( `/company/companies/${cwCompanyId}/contacts?pageSize=1000`, ), { label: `company#${cwCompanyId}/allContacts` }, ), ]); if (!cwCompany) return null; // Default contact: derive from allContacts instead of making an // extra serial CW call. The company object carries the default // contact's ID, so we can pull it from the list we already fetched. const defaultContactId = cwCompany.defaultContact?.id; const defaultContactData = defaultContactId ? ((allContactsData.data as any[]).find( (c: any) => c.id === defaultContactId, ) ?? null) : null; const blob = { company: cwCompany, defaultContact: defaultContactData, allContacts: allContactsData.data, }; await redis.set( companyCwCacheKey(cwCompanyId), JSON.stringify(blob), "PX", ttlMs, ); return blob; } catch (err) { if (isNotFoundError(err)) return null; if (isTransientError(err)) { console.warn(`[cache] company#${cwCompanyId}: ${describeError(err)}`); return null; } throw err; } } /** * Fetch opportunity notes from CW and cache the raw response. * * Returns an empty array if CW responds with 404. * * @returns The raw CW notes array. */ export async function fetchAndCacheNotes( cwOpportunityId: number, ttlMs: number, ): Promise { try { const notes = await opportunityCw.fetchNotes(cwOpportunityId); await redis.set( notesCacheKey(cwOpportunityId), JSON.stringify(notes), "PX", ttlMs, ); return notes; } catch (err) { if (isNotFoundError(err)) return []; if (isTransientError(err)) { console.warn( `[cache] notes opp#${cwOpportunityId}: ${describeError(err)}`, ); return []; } throw err; } } /** * Fetch opportunity contacts from CW and cache the raw response. * * Returns an empty array if CW responds with 404. * * @returns The raw CW contacts array. */ export async function fetchAndCacheContacts( cwOpportunityId: number, ttlMs: number, ): Promise { try { const contacts = await opportunityCw.fetchContacts(cwOpportunityId); await redis.set( contactsCacheKey(cwOpportunityId), JSON.stringify(contacts), "PX", ttlMs, ); return contacts; } catch (err) { if (isNotFoundError(err)) return []; if (isTransientError(err)) { console.warn( `[cache] contacts opp#${cwOpportunityId}: ${describeError(err)}`, ); return []; } throw err; } } /** * Invalidate cached notes for an opportunity. * * Call this after any note mutation (create, update, delete) so the * next read refreshes from ConnectWise. */ export async function invalidateNotesCache( cwOpportunityId: number, ): Promise { await redis.del(notesCacheKey(cwOpportunityId)); } /** * Invalidate cached contacts for an opportunity. * * Call this after any contact mutation so the next read refreshes * from ConnectWise. */ export async function invalidateContactsCache( cwOpportunityId: number, ): Promise { await redis.del(contactsCacheKey(cwOpportunityId)); } /** * Fetch opportunity products (forecast + procurement) from CW and cache. * * Stores both the forecast response and procurement products together * so that `fetchProducts()` can reconstruct ForecastProductControllers * from a single cache hit. * * @returns `{ forecast, procProducts }` blob. */ export async function fetchAndCacheProducts( cwOpportunityId: number, ttlMs: number, ): Promise<{ forecast: any; procProducts: any[] }> { try { const [forecast, procProducts] = await Promise.all([ opportunityCw.fetchProducts(cwOpportunityId), opportunityCw.fetchProcurementProducts(cwOpportunityId), ]); const blob = { forecast, procProducts }; await redis.set( productsCacheKey(cwOpportunityId), JSON.stringify(blob), "PX", ttlMs, ); return blob; } catch (err) { if (isNotFoundError(err)) return { forecast: { forecastItems: [] }, procProducts: [] }; if (isTransientError(err)) { console.warn( `[cache] products opp#${cwOpportunityId}: ${describeError(err)}`, ); return { forecast: { forecastItems: [] }, procProducts: [] }; } throw err; } } /** * Invalidate cached products for an opportunity. * * Call this after any product mutation (add, update, resequence) so the * next read refreshes from ConnectWise. */ export async function invalidateProductsCache( cwOpportunityId: number, ): Promise { await redis.del(productsCacheKey(cwOpportunityId)); } /** * Site TTL — 20 minutes. Site/address data rarely changes so we cache * aggressively. The background refresh does NOT proactively warm site keys; * they are populated lazily on the first detail-view request. */ const SITE_TTL_MS = 1_200_000; /** * Fetch a CW company site from ConnectWise and cache the result. * * @returns The raw CW site object. */ export async function fetchAndCacheSite( cwCompanyId: number, cwSiteId: number, ): Promise { try { const site = await fetchCompanySite(cwCompanyId, cwSiteId); await redis.set( siteCacheKey(cwCompanyId, cwSiteId), JSON.stringify(site), "PX", SITE_TTL_MS, ); return site; } catch (err) { if (isNotFoundError(err)) return null; if (isTransientError(err)) { console.warn( `[cache] site company#${cwCompanyId}/site#${cwSiteId}: ${describeError(err)}`, ); return null; } throw err; } } /** * Fetch the raw CW opportunity response from ConnectWise and cache it. * * Used by `fetchItem()` in the manager to avoid a CW roundtrip when * the detail view is reloaded within the cache TTL window. * * @param cwOpportunityId - The CW opportunity ID * @param ttlMs - Cache TTL in milliseconds * @returns The raw CW opportunity response object. */ export async function fetchAndCacheOppCwData( cwOpportunityId: number, ttlMs: number, ): Promise { try { const cwData = await opportunityCw.fetch(cwOpportunityId); await redis.set( oppCwDataCacheKey(cwOpportunityId), JSON.stringify(cwData), "PX", ttlMs, ); return cwData; } catch (err) { if (isNotFoundError(err)) return null; if (isTransientError(err)) { console.warn(`[cache] opp#${cwOpportunityId}: ${describeError(err)}`); return null; } throw err; } } // --------------------------------------------------------------------------- // Background refresh // --------------------------------------------------------------------------- /** * Refresh the opportunity cache. * * Scans all non-closed opportunities in the database, computes a TTL for each, * checks whether the cache key still exists, and re-fetches from ConnectWise * only for entries that have expired. * * Designed to be called every **30 seconds** from the process entry point. */ export async function refreshOpportunityCache(): Promise { // Include non-closed AND recently-closed (within 30 days) opportunities const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const opportunities = await prisma.opportunity.findMany({ where: { OR: [ { closedFlag: false }, { closedFlag: true, closedDate: { gte: thirtyDaysAgo } }, ], }, select: { cwOpportunityId: true, closedFlag: true, closedDate: true, expectedCloseDate: true, cwLastUpdated: true, statusCwId: true, company: { select: { cw_CompanyId: true } }, }, orderBy: { cwLastUpdated: "desc" }, }); events.emit("cache:opportunities:refresh:started", { totalOpportunities: opportunities.length, }); let activitiesRefreshed = 0; let companiesRefreshed = 0; let notesRefreshed = 0; let contactsRefreshed = 0; let productsRefreshed = 0; let oppCwDataRefreshed = 0; let skipped = 0; // Batch-check which keys already exist via a single pipeline // (5 EXISTS per opportunity: oppCwData, activities, notes, contacts, products). const pipeline = redis.pipeline(); for (const opp of opportunities) { pipeline.exists(oppCwDataCacheKey(opp.cwOpportunityId)); pipeline.exists(activityCacheKey(opp.cwOpportunityId)); pipeline.exists(notesCacheKey(opp.cwOpportunityId)); pipeline.exists(contactsCacheKey(opp.cwOpportunityId)); pipeline.exists(productsCacheKey(opp.cwOpportunityId)); } const existsResults = await pipeline.exec(); const refreshTasks: (() => Promise)[] = []; for (let i = 0; i < opportunities.length; i++) { const opp = opportunities[i]!; const cacheTTLInput = { closedFlag: opp.closedFlag, closedDate: opp.closedDate, expectedCloseDate: opp.expectedCloseDate, lastUpdated: opp.cwLastUpdated, }; const ttl = computeCacheTTL(cacheTTLInput); const subTTL = computeSubResourceCacheTTL(cacheTTLInput); const productsTTL = computeProductsCacheTTL({ ...cacheTTLInput, statusCwId: opp.statusCwId, }); // Skip closed (ttl === null) — should not happen because of the query filter, // but guard anyway. if (ttl === null) { skipped++; continue; } // existsResults entries are [error, result] tuples // Pipeline order per opportunity: oppCwData, activities, notes, contacts, products const baseIdx = i * 5; const oppCwDataExists = existsResults?.[baseIdx]?.[1] === 1; const activityExists = existsResults?.[baseIdx + 1]?.[1] === 1; const notesExist = existsResults?.[baseIdx + 2]?.[1] === 1; const contactsExist = existsResults?.[baseIdx + 3]?.[1] === 1; const productsExist = existsResults?.[baseIdx + 4]?.[1] === 1; // Proactively cache the CW opportunity response itself if (!oppCwDataExists) { refreshTasks.push(() => fetchAndCacheOppCwData(opp.cwOpportunityId, ttl).then(() => { oppCwDataRefreshed++; }), ); } if (!activityExists) { refreshTasks.push(() => fetchAndCacheActivities(opp.cwOpportunityId, ttl).then(() => { activitiesRefreshed++; }), ); } // Refresh notes/contacts if sub-resource TTL applies and key is missing if (subTTL !== null) { if (!notesExist) { refreshTasks.push(() => fetchAndCacheNotes(opp.cwOpportunityId, subTTL).then(() => { notesRefreshed++; }), ); } if (!contactsExist) { refreshTasks.push(() => fetchAndCacheContacts(opp.cwOpportunityId, subTTL).then(() => { contactsRefreshed++; }), ); } } // Proactively refresh products only for hot opps (updated within 3 days). // 30-minute lazy-cached products are filled on-demand by the endpoint // and do not need background refresh. if (productsTTL === PRODUCTS_TTL_HOT && !productsExist) { refreshTasks.push(() => fetchAndCacheProducts(opp.cwOpportunityId, productsTTL).then(() => { productsRefreshed++; }), ); } // Also refresh company CW data if the key is missing if (opp.company?.cw_CompanyId) { const cwCompanyId = opp.company.cw_CompanyId; refreshTasks.push(async () => { const companyExists = await redis.exists( companyCwCacheKey(cwCompanyId), ); if (!companyExists) { await fetchAndCacheCompanyCwData(cwCompanyId, ttl); companiesRefreshed++; } }); } } // Run refresh thunks with bounded concurrency and inter-batch delay. // Each thunk is only invoked here — no requests fire until we call them. // CW rate-limits aggressively so we keep this conservative. const CONCURRENCY = 6; const BATCH_DELAY_MS = 250; let timeoutCount = 0; for (let i = 0; i < refreshTasks.length; i += CONCURRENCY) { const batch = refreshTasks.slice(i, i + CONCURRENCY); const results = await Promise.allSettled(batch.map((fn) => fn())); for (const r of results) { if (r.status === "rejected") timeoutCount++; } // Small delay between batches to avoid overwhelming CW if (i + CONCURRENCY < refreshTasks.length) { await new Promise((resolve) => setTimeout(resolve, BATCH_DELAY_MS)); } } if (timeoutCount > 0) { console.warn( `[cache] refresh: ${timeoutCount} task(s) failed (likely CW timeouts) — will retry next cycle`, ); } events.emit("cache:opportunities:refresh:completed", { totalOpportunities: opportunities.length, activitiesRefreshed, companiesRefreshed, notesRefreshed, contactsRefreshed, productsRefreshed, oppCwDataRefreshed, skipped, }); }