feat: Redis opportunity cache, CW API retry/logging, adaptive TTLs

- Add Redis-backed opportunity cache with background refresh (30s interval)
- Fix concurrency bug: use lazy thunks instead of eager promises for batching
- Add withCwRetry utility with exponential backoff for transient CW errors
- Add adaptive TTL algorithms (primary, sub-resource, products) based on opportunity activity
- Add include query param on GET /sales/opportunities/:id (notes,contacts,products)
- Add opt-in CW API logger (LOG_CW_API env var) with timestamped files in cw-api-logs/
- Add debug-scripts/analyze-cw-calls.py for API call analysis
- Add computeSubResourceCacheTTL and computeProductsCacheTTL algorithms with tests
- Increase CW API timeout from 15s to 30s
- Unblock cache refresh from startup chain (remove await)
- Prioritize recently updated opportunities in refresh cycle
- Add CACHING.md documentation
- Update API_ROUTES.md with caching details and include param
- Update copilot instructions to require CACHING.md sync
- Add dev:log script for CW API call logging during development
This commit is contained in:
2026-03-02 23:23:24 -06:00
parent fe71248e88
commit 6d935e7180
33 changed files with 2634 additions and 176 deletions
@@ -0,0 +1,114 @@
/**
* @module computeProductsCacheTTL
*
* Adaptive Cache TTL for Opportunity Products
* ============================================
*
* Determines how long products (forecast items) should be cached in
* Redis before being re-fetched from ConnectWise.
*
* Products have unique caching rules compared to notes or contacts
* because they are typically finalised before a deal closes and do not
* change once the opportunity reaches a terminal status.
*
* ## Spec
*
* | # | Condition | TTL (ms) | TTL (human) | Rationale |
* |---|------------------------------------------------------------------------------|------------|-------------|---------------------------------------------------------------------------------------|
* | 1 | Status is **Won**, **Lost**, **Pending Won**, or **Pending Lost** | `null` | No cache | Products on terminal / near-terminal opps are static; no need to keep them warm. |
* | 2 | Opportunity is **not cacheable** (main cache TTL is `null`) | `null` | No cache | If the opp itself is evicted, sub-resources follow suit. |
* | 3 | `lastUpdated` is within the last **3 days** | 15 000 | 15 seconds | Actively-worked deals — products are being edited and need near-real-time freshness. |
* | 4 | Everything else | 1 800 000 | 30 minutes | Lazy on-demand cache: fetched when requested, expires after 30 min without refresh. |
*
* ## Evaluation order
*
* Rules are evaluated top-to-bottom; the first matching rule wins.
*
* ## Inputs
*
* Extends {@link CacheTTLInput} from `computeCacheTTL` with an
* additional `statusCwId` field used to identify terminal statuses.
*
* ## Output
*
* Returns `number | null`:
* - Positive integer = TTL in **milliseconds**.
* - `null` = do **not** cache.
*/
import type { CacheTTLInput } from "./computeCacheTTL";
import { computeCacheTTL } from "./computeCacheTTL";
import { QUOTE_STATUSES } from "../../types/QuoteStatuses";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** 15 seconds — TTL for hot products (opportunity updated within 3 days). */
export const PRODUCTS_TTL_HOT = 15_000;
/** 30 minutes — TTL for on-demand product cache (lazy fallback). */
export const PRODUCTS_TTL_LAZY = 1_800_000;
/** 3 days in milliseconds. */
const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000;
/**
* Set of all CW status IDs that map to a Won or Lost canonical status.
*
* Built at module load from {@link QUOTE_STATUSES} so it stays in sync
* with any future status additions.
*/
export const WON_LOST_STATUS_IDS: ReadonlySet<number> = new Set(
QUOTE_STATUSES.filter((s) => s.wonFlag || s.lostFlag).flatMap((s) => [
s.id,
...s.optimaEquivalency,
]),
);
// ---------------------------------------------------------------------------
// Input type
// ---------------------------------------------------------------------------
export interface ProductsCacheTTLInput extends CacheTTLInput {
/** The CW status ID of the opportunity. */
statusCwId: number | null;
}
// ---------------------------------------------------------------------------
// Algorithm
// ---------------------------------------------------------------------------
/**
* Compute the cache TTL for an opportunity's products.
*
* @param input - The opportunity's activity signals plus status ID.
* @returns TTL in milliseconds, or `null` if products should not be cached.
*/
export function computeProductsCacheTTL(
input: ProductsCacheTTLInput,
): number | null {
const { statusCwId, lastUpdated, now = new Date() } = input;
// Rule 1 — Terminal statuses: Won / Lost / Pending Won / Pending Lost
if (statusCwId !== null && WON_LOST_STATUS_IDS.has(statusCwId)) {
return null;
}
// Rule 2 — If the opportunity itself is not cacheable, skip products too
const mainTTL = computeCacheTTL(input);
if (mainTTL === null) {
return null;
}
// Rule 3 — Hot: updated within the last 3 days
if (lastUpdated) {
const diff = Math.abs(now.getTime() - lastUpdated.getTime());
if (diff <= THREE_DAYS_MS) {
return PRODUCTS_TTL_HOT;
}
}
// Rule 4 — Lazy fallback
return PRODUCTS_TTL_LAZY;
}
@@ -0,0 +1,118 @@
/**
* @module computeSubResourceCacheTTL
*
* Adaptive Cache TTL for Opportunity Sub-Resources
* =================================================
*
* Determines how long cached sub-resource data (notes, contacts) should
* live before being re-fetched from ConnectWise.
*
* Sub-resources change less frequently than the opportunity record itself
* or its activity feed, so TTLs are longer than the primary cache. The
* same activity-signal heuristics are used (expected close date, last
* updated, closed status) but with relaxed durations.
*
* ## Spec
*
* | # | Condition | TTL (ms) | TTL (human) | Rationale |
* |---|-------------------------------------------------------------------|----------|-------------|--------------------------------------------------------------------|
* | 1 | `closedFlag` is `true` AND closed > 30 days ago | `null` | Do not cache| Old closed records are rarely accessed. |
* | 1b| `closedFlag` is `true` AND closed within 30 days | 300 000 | 5 minutes | Recently-closed records may still be viewed occasionally. |
* | 2 | `expectedCloseDate` OR `lastUpdated` within **5 days** | 60 000 | 60 seconds | Active deals — contacts/notes may still change. |
* | 3 | `expectedCloseDate` OR `lastUpdated` within **14 days** | 120 000 | 2 minutes | Moderate activity — less likely to change. |
* | 4 | Everything else (older than 14 days) | 300 000 | 5 minutes | Low activity — safe to cache longer. |
*
* ## Evaluation order
*
* Rules are evaluated top-to-bottom; the first matching rule wins.
*
* ## Inputs
*
* Uses the same {@link CacheTTLInput} interface as `computeCacheTTL`.
*
* ## Output
*
* Returns `number | null`:
* - Positive integer = TTL in **milliseconds**.
* - `null` = do **not** cache.
*/
import type { CacheTTLInput } from "./computeCacheTTL";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** 60 seconds — TTL for high-activity sub-resources (within 5 days). */
export const SUB_TTL_HIGH_ACTIVITY = 60_000;
/** 2 minutes — TTL for moderate-activity sub-resources (within 14 days). */
export const SUB_TTL_MODERATE_ACTIVITY = 120_000;
/** 5 minutes — TTL for low-activity / stale sub-resources. */
export const SUB_TTL_LOW_ACTIVITY = 300_000;
/** 30 days in milliseconds. */
const THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000;
/** 5 days in milliseconds. */
const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000;
/** 14 days in milliseconds. */
const FOURTEEN_DAYS_MS = 14 * 24 * 60 * 60 * 1000;
// ---------------------------------------------------------------------------
// Algorithm
// ---------------------------------------------------------------------------
/**
* Compute the cache TTL for an opportunity sub-resource (notes, contacts).
*
* @param input - The opportunity's activity signals. See {@link CacheTTLInput}.
* @returns The TTL in milliseconds, or `null` if the data should not be cached.
*/
export function computeSubResourceCacheTTL(
input: CacheTTLInput,
): number | null {
const {
closedFlag,
closedDate,
expectedCloseDate,
lastUpdated,
now = new Date(),
} = input;
const nowMs = now.getTime();
const isWithinWindow = (date: Date | null, windowMs: number): boolean => {
if (!date) return false;
return Math.abs(nowMs - date.getTime()) <= windowMs;
};
// Rule 1 — Closed records
if (closedFlag) {
if (isWithinWindow(closedDate, THIRTY_DAYS_MS)) {
return SUB_TTL_LOW_ACTIVITY;
}
return null;
}
// Rule 2 — High activity (5 days)
if (
isWithinWindow(expectedCloseDate, FIVE_DAYS_MS) ||
isWithinWindow(lastUpdated, FIVE_DAYS_MS)
) {
return SUB_TTL_HIGH_ACTIVITY;
}
// Rule 3 — Moderate activity (14 days)
if (
isWithinWindow(expectedCloseDate, FOURTEEN_DAYS_MS) ||
isWithinWindow(lastUpdated, FOURTEEN_DAYS_MS)
) {
return SUB_TTL_MODERATE_ACTIVITY;
}
// Rule 4 — Low activity / stale
return SUB_TTL_LOW_ACTIVITY;
}
+546 -49
View File
@@ -13,6 +13,10 @@
* `CWActivity[]` array fetched from `activityCw.fetchByOpportunity()`.
* - **Company CW data** (`opp:company-cw:{cw_CompanyId}`) — the hydrated
* company / contacts blob set by `CompanyController.hydrateCwData()`.
* - **Notes** (`opp:notes:{cwOpportunityId}`) — raw CW notes array.
* - **Contacts** (`opp:contacts:{cwOpportunityId}`) — raw CW contacts array.
* - **Products** (`opp:products:{cwOpportunityId}`) — raw CW forecast +
* procurement products blob.
*
* TTLs are computed dynamically via {@link computeCacheTTL} so hot
* opportunities refresh every 30 s while stale ones live for 15 min.
@@ -28,8 +32,16 @@
import { prisma, redis } from "../../constants";
import { activityCw } from "../cw-utils/activities/activities";
import { computeCacheTTL } from "../algorithms/computeCacheTTL";
import { computeSubResourceCacheTTL } from "../algorithms/computeSubResourceCacheTTL";
import {
computeProductsCacheTTL,
PRODUCTS_TTL_HOT,
} from "../algorithms/computeProductsCacheTTL";
import { connectWiseApi } from "../../constants";
import { fetchCwCompanyById } from "../cw-utils/fetchCompany";
import { fetchCompanySite } from "../cw-utils/sites/companySites";
import { opportunityCw } from "../cw-utils/opportunities/opportunities";
import { withCwRetry } from "../cw-utils/withCwRetry";
import { events } from "../globalEvents";
// ---------------------------------------------------------------------------
@@ -38,6 +50,11 @@ import { events } from "../globalEvents";
const ACTIVITY_PREFIX = "opp:activities:";
const COMPANY_CW_PREFIX = "opp:company-cw:";
const NOTES_PREFIX = "opp:notes:";
const CONTACTS_PREFIX = "opp:contacts:";
const PRODUCTS_PREFIX = "opp:products:";
const SITE_PREFIX = "opp:site:";
const OPP_CW_PREFIX = "opp:cw-data:";
/** Redis key for cached activities by CW opportunity ID. */
export const activityCacheKey = (cwOppId: number) =>
@@ -47,6 +64,25 @@ export const activityCacheKey = (cwOppId: number) =>
export const companyCwCacheKey = (cwCompanyId: number) =>
`${COMPANY_CW_PREFIX}${cwCompanyId}`;
/** Redis key for cached opportunity notes by CW opportunity ID. */
export const notesCacheKey = (cwOppId: number) => `${NOTES_PREFIX}${cwOppId}`;
/** Redis key for cached opportunity contacts by CW opportunity ID. */
export const contactsCacheKey = (cwOppId: number) =>
`${CONTACTS_PREFIX}${cwOppId}`;
/** Redis key for cached opportunity products by CW opportunity ID. */
export const productsCacheKey = (cwOppId: number) =>
`${PRODUCTS_PREFIX}${cwOppId}`;
/** Redis key for cached company site by CW company ID + site ID. */
export const siteCacheKey = (cwCompanyId: number, cwSiteId: number) =>
`${SITE_PREFIX}${cwCompanyId}:${cwSiteId}`;
/** Redis key for cached CW opportunity response by CW opportunity ID. */
export const oppCwDataCacheKey = (cwOppId: number) =>
`${OPP_CW_PREFIX}${cwOppId}`;
// ---------------------------------------------------------------------------
// Read helpers
// ---------------------------------------------------------------------------
@@ -85,6 +121,147 @@ export async function getCachedCompanyCwData(
}
}
/**
* Retrieve cached opportunity notes (raw CW data).
*
* @returns The parsed raw CW notes array or `null` on cache miss.
*/
export async function getCachedNotes(
cwOpportunityId: number,
): Promise<any[] | null> {
const raw = await redis.get(notesCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached opportunity contacts (raw CW data).
*
* @returns The parsed raw CW contacts array or `null` on cache miss.
*/
export async function getCachedContacts(
cwOpportunityId: number,
): Promise<any[] | null> {
const raw = await redis.get(contactsCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached opportunity products (raw CW forecast + procurement blob).
*
* @returns `{ forecast, procProducts }` or `null` on cache miss.
*/
export async function getCachedProducts(
cwOpportunityId: number,
): Promise<{ forecast: any; procProducts: any[] } | null> {
const raw = await redis.get(productsCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached CW site data for a company/site pair.
*
* @returns Parsed site data or `null` on cache miss.
*/
export async function getCachedSite(
cwCompanyId: number,
cwSiteId: number,
): Promise<any | null> {
const raw = await redis.get(siteCacheKey(cwCompanyId, cwSiteId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached CW opportunity response data.
*
* @returns Parsed CW opportunity object or `null` on cache miss.
*/
export async function getCachedOppCwData(
cwOpportunityId: number,
): Promise<any | null> {
const raw = await redis.get(oppCwDataCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Check whether an error is an Axios 404 (resource not found in CW). */
function isNotFoundError(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
return e.isAxiosError === true && e.response?.status === 404;
}
/**
* Check whether an error is a transient network / timeout error.
*
* These are safe to swallow in background refresh tasks — CW will be
* retried on the next refresh cycle. Logs a concise one-line warning
* instead of dumping the full Axios error object.
*/
function isTransientError(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
if (!e.isAxiosError) return false;
const code = e.code as string | undefined;
return (
code === "ECONNABORTED" ||
code === "ECONNREFUSED" ||
code === "ECONNRESET" ||
code === "ETIMEDOUT" ||
code === "ERR_NETWORK" ||
code === "ENETUNREACH" ||
code === "ERR_BAD_RESPONSE"
);
}
/** Build a concise error description for logging (avoids dumping entire Axios objects). */
function describeError(err: unknown): string {
if (typeof err !== "object" || err === null) return String(err);
const e = err as Record<string, any>;
if (e.isAxiosError) {
const method = (e.config?.method ?? "?").toUpperCase();
const url = e.config?.url ?? "unknown";
const code = e.code ?? "";
const status = e.response?.status ?? "";
return `${method} ${url}${code || `HTTP ${status}`} (${e.message})`;
}
return e.message ?? String(err);
}
/**
* When true, transient-error warnings inside fetchAndCache* are suppressed.
* Used during background refresh to avoid flooding the terminal — the
* refresh function prints a single summary line instead.
*/
let _suppressTransientWarnings = false;
// ---------------------------------------------------------------------------
// Write helpers
// ---------------------------------------------------------------------------
@@ -92,21 +269,35 @@ export async function getCachedCompanyCwData(
/**
* Fetch activities from CW and cache them with the appropriate TTL.
*
* Returns an empty array if CW responds with 404 (opportunity doesn't
* exist or was deleted upstream).
*
* @returns The raw `CWActivity[]` collection (as plain array).
*/
export async function fetchAndCacheActivities(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
const collection = await activityCw.fetchByOpportunity(cwOpportunityId);
const arr = collection.map((item) => item);
await redis.set(
activityCacheKey(cwOpportunityId),
JSON.stringify(arr),
"PX",
ttlMs,
);
return arr;
try {
// Use the direct (single-call) variant to avoid the extra count request
const arr = await activityCw.fetchByOpportunityDirect(cwOpportunityId);
await redis.set(
activityCacheKey(cwOpportunityId),
JSON.stringify(arr),
"PX",
ttlMs,
);
return arr;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] activities opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
@@ -118,32 +309,255 @@ export async function fetchAndCacheCompanyCwData(
cwCompanyId: number,
ttlMs: number,
): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> {
const cwCompany = await fetchCwCompanyById(cwCompanyId);
if (!cwCompany) return null;
try {
const cwCompany = await fetchCwCompanyById(cwCompanyId);
if (!cwCompany) return null;
const contactHref = cwCompany.defaultContact?._info?.contact_href;
const defaultContactData = contactHref
? await connectWiseApi.get(contactHref)
: undefined;
const contactHref = cwCompany.defaultContact?._info?.contact_href;
const defaultContactData = contactHref
? await withCwRetry(() => connectWiseApi.get(contactHref), {
label: `company#${cwCompanyId}/defaultContact`,
})
: undefined;
const allContactsData = await connectWiseApi.get(
`${cwCompany._info.contacts_href}&pageSize=1000`,
);
const allContactsData = await withCwRetry(
() =>
connectWiseApi.get(`${cwCompany._info.contacts_href}&pageSize=1000`),
{ label: `company#${cwCompanyId}/allContacts` },
);
const blob = {
company: cwCompany,
defaultContact: defaultContactData?.data ?? null,
allContacts: allContactsData.data,
};
const blob = {
company: cwCompany,
defaultContact: defaultContactData?.data ?? null,
allContacts: allContactsData.data,
};
await redis.set(
companyCwCacheKey(cwCompanyId),
JSON.stringify(blob),
"PX",
ttlMs,
);
await redis.set(
companyCwCacheKey(cwCompanyId),
JSON.stringify(blob),
"PX",
ttlMs,
);
return blob;
return blob;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(`[cache] company#${cwCompanyId}: ${describeError(err)}`);
return null;
}
throw err;
}
}
/**
* Fetch opportunity notes from CW and cache the raw response.
*
* Returns an empty array if CW responds with 404.
*
* @returns The raw CW notes array.
*/
export async function fetchAndCacheNotes(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
try {
const notes = await opportunityCw.fetchNotes(cwOpportunityId);
await redis.set(
notesCacheKey(cwOpportunityId),
JSON.stringify(notes),
"PX",
ttlMs,
);
return notes;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] notes opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
* Fetch opportunity contacts from CW and cache the raw response.
*
* Returns an empty array if CW responds with 404.
*
* @returns The raw CW contacts array.
*/
export async function fetchAndCacheContacts(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
try {
const contacts = await opportunityCw.fetchContacts(cwOpportunityId);
await redis.set(
contactsCacheKey(cwOpportunityId),
JSON.stringify(contacts),
"PX",
ttlMs,
);
return contacts;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] contacts opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
* Invalidate cached notes for an opportunity.
*
* Call this after any note mutation (create, update, delete) so the
* next read refreshes from ConnectWise.
*/
export async function invalidateNotesCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(notesCacheKey(cwOpportunityId));
}
/**
* Invalidate cached contacts for an opportunity.
*
* Call this after any contact mutation so the next read refreshes
* from ConnectWise.
*/
export async function invalidateContactsCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(contactsCacheKey(cwOpportunityId));
}
/**
* Fetch opportunity products (forecast + procurement) from CW and cache.
*
* Stores both the forecast response and procurement products together
* so that `fetchProducts()` can reconstruct ForecastProductControllers
* from a single cache hit.
*
* @returns `{ forecast, procProducts }` blob.
*/
export async function fetchAndCacheProducts(
cwOpportunityId: number,
ttlMs: number,
): Promise<{ forecast: any; procProducts: any[] }> {
try {
const [forecast, procProducts] = await Promise.all([
opportunityCw.fetchProducts(cwOpportunityId),
opportunityCw.fetchProcurementProducts(cwOpportunityId),
]);
const blob = { forecast, procProducts };
await redis.set(
productsCacheKey(cwOpportunityId),
JSON.stringify(blob),
"PX",
ttlMs,
);
return blob;
} catch (err) {
if (isNotFoundError(err))
return { forecast: { forecastItems: [] }, procProducts: [] };
if (isTransientError(err)) {
console.warn(
`[cache] products opp#${cwOpportunityId}: ${describeError(err)}`,
);
return { forecast: { forecastItems: [] }, procProducts: [] };
}
throw err;
}
}
/**
* Invalidate cached products for an opportunity.
*
* Call this after any product mutation (add, update, resequence) so the
* next read refreshes from ConnectWise.
*/
export async function invalidateProductsCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(productsCacheKey(cwOpportunityId));
}
/**
* Site TTL — 30 minutes. Site/address data rarely changes so we cache
* aggressively. The background refresh does NOT proactively warm site keys;
* they are populated lazily on the first detail-view request.
*/
const SITE_TTL_MS = 1_800_000;
/**
* Fetch a CW company site from ConnectWise and cache the result.
*
* @returns The raw CW site object.
*/
export async function fetchAndCacheSite(
cwCompanyId: number,
cwSiteId: number,
): Promise<any> {
try {
const site = await fetchCompanySite(cwCompanyId, cwSiteId);
await redis.set(
siteCacheKey(cwCompanyId, cwSiteId),
JSON.stringify(site),
"PX",
SITE_TTL_MS,
);
return site;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(
`[cache] site company#${cwCompanyId}/site#${cwSiteId}: ${describeError(err)}`,
);
return null;
}
throw err;
}
}
/**
* Fetch the raw CW opportunity response from ConnectWise and cache it.
*
* Used by `fetchItem()` in the manager to avoid a CW roundtrip when
* the detail view is reloaded within the cache TTL window.
*
* @param cwOpportunityId - The CW opportunity ID
* @param ttlMs - Cache TTL in milliseconds
* @returns The raw CW opportunity response object.
*/
export async function fetchAndCacheOppCwData(
cwOpportunityId: number,
ttlMs: number,
): Promise<any> {
try {
const cwData = await opportunityCw.fetch(cwOpportunityId);
await redis.set(
oppCwDataCacheKey(cwOpportunityId),
JSON.stringify(cwData),
"PX",
ttlMs,
);
return cwData;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(`[cache] opp#${cwOpportunityId}: ${describeError(err)}`);
return null;
}
throw err;
}
}
// ---------------------------------------------------------------------------
@@ -176,8 +590,10 @@ export async function refreshOpportunityCache(): Promise<void> {
closedDate: true,
expectedCloseDate: true,
cwLastUpdated: true,
statusCwId: true,
company: { select: { cw_CompanyId: true } },
},
orderBy: { cwLastUpdated: "desc" },
});
events.emit("cache:opportunities:refresh:started", {
@@ -186,25 +602,41 @@ export async function refreshOpportunityCache(): Promise<void> {
let activitiesRefreshed = 0;
let companiesRefreshed = 0;
let notesRefreshed = 0;
let contactsRefreshed = 0;
let productsRefreshed = 0;
let oppCwDataRefreshed = 0;
let skipped = 0;
// Batch-check which activity keys already exist via a pipeline
// Batch-check which keys already exist via a single pipeline
// (5 EXISTS per opportunity: oppCwData, activities, notes, contacts, products).
const pipeline = redis.pipeline();
for (const opp of opportunities) {
pipeline.exists(oppCwDataCacheKey(opp.cwOpportunityId));
pipeline.exists(activityCacheKey(opp.cwOpportunityId));
pipeline.exists(notesCacheKey(opp.cwOpportunityId));
pipeline.exists(contactsCacheKey(opp.cwOpportunityId));
pipeline.exists(productsCacheKey(opp.cwOpportunityId));
}
const existsResults = await pipeline.exec();
const refreshTasks: Promise<void>[] = [];
const refreshTasks: (() => Promise<void>)[] = [];
for (let i = 0; i < opportunities.length; i++) {
const opp = opportunities[i]!;
const ttl = computeCacheTTL({
const cacheTTLInput = {
closedFlag: opp.closedFlag,
closedDate: opp.closedDate,
expectedCloseDate: opp.expectedCloseDate,
lastUpdated: opp.cwLastUpdated,
};
const ttl = computeCacheTTL(cacheTTLInput);
const subTTL = computeSubResourceCacheTTL(cacheTTLInput);
const productsTTL = computeProductsCacheTTL({
...cacheTTLInput,
statusCwId: opp.statusCwId,
});
// Skip closed (ttl === null) — should not happen because of the query filter,
@@ -215,43 +647,108 @@ export async function refreshOpportunityCache(): Promise<void> {
}
// existsResults entries are [error, result] tuples
const activityExists = existsResults?.[i]?.[1] === 1;
// Pipeline order per opportunity: oppCwData, activities, notes, contacts, products
const baseIdx = i * 5;
const oppCwDataExists = existsResults?.[baseIdx]?.[1] === 1;
const activityExists = existsResults?.[baseIdx + 1]?.[1] === 1;
const notesExist = existsResults?.[baseIdx + 2]?.[1] === 1;
const contactsExist = existsResults?.[baseIdx + 3]?.[1] === 1;
const productsExist = existsResults?.[baseIdx + 4]?.[1] === 1;
// Proactively cache the CW opportunity response itself
if (!oppCwDataExists) {
refreshTasks.push(() =>
fetchAndCacheOppCwData(opp.cwOpportunityId, ttl).then(() => {
oppCwDataRefreshed++;
}),
);
}
if (!activityExists) {
refreshTasks.push(
refreshTasks.push(() =>
fetchAndCacheActivities(opp.cwOpportunityId, ttl).then(() => {
activitiesRefreshed++;
}),
);
}
// Refresh notes/contacts if sub-resource TTL applies and key is missing
if (subTTL !== null) {
if (!notesExist) {
refreshTasks.push(() =>
fetchAndCacheNotes(opp.cwOpportunityId, subTTL).then(() => {
notesRefreshed++;
}),
);
}
if (!contactsExist) {
refreshTasks.push(() =>
fetchAndCacheContacts(opp.cwOpportunityId, subTTL).then(() => {
contactsRefreshed++;
}),
);
}
}
// Proactively refresh products only for hot opps (updated within 3 days).
// 30-minute lazy-cached products are filled on-demand by the endpoint
// and do not need background refresh.
if (productsTTL === PRODUCTS_TTL_HOT && !productsExist) {
refreshTasks.push(() =>
fetchAndCacheProducts(opp.cwOpportunityId, productsTTL).then(() => {
productsRefreshed++;
}),
);
}
// Also refresh company CW data if the key is missing
if (opp.company?.cw_CompanyId) {
const cwCompanyId = opp.company.cw_CompanyId;
refreshTasks.push(
(async () => {
const companyExists = await redis.exists(
companyCwCacheKey(cwCompanyId),
);
if (!companyExists) {
await fetchAndCacheCompanyCwData(cwCompanyId, ttl);
companiesRefreshed++;
}
})(),
);
refreshTasks.push(async () => {
const companyExists = await redis.exists(
companyCwCacheKey(cwCompanyId),
);
if (!companyExists) {
await fetchAndCacheCompanyCwData(cwCompanyId, ttl);
companiesRefreshed++;
}
});
}
}
// Run all refresh tasks concurrently with bounded concurrency
const CONCURRENCY = 10;
// Run refresh thunks with bounded concurrency and inter-batch delay.
// Each thunk is only invoked here — no requests fire until we call them.
// CW rate-limits aggressively so we keep this conservative.
const CONCURRENCY = 6;
const BATCH_DELAY_MS = 250;
let timeoutCount = 0;
for (let i = 0; i < refreshTasks.length; i += CONCURRENCY) {
await Promise.allSettled(refreshTasks.slice(i, i + CONCURRENCY));
const batch = refreshTasks.slice(i, i + CONCURRENCY);
const results = await Promise.allSettled(batch.map((fn) => fn()));
for (const r of results) {
if (r.status === "rejected") timeoutCount++;
}
// Small delay between batches to avoid overwhelming CW
if (i + CONCURRENCY < refreshTasks.length) {
await new Promise((resolve) => setTimeout(resolve, BATCH_DELAY_MS));
}
}
if (timeoutCount > 0) {
console.warn(
`[cache] refresh: ${timeoutCount} task(s) failed (likely CW timeouts) — will retry next cycle`,
);
}
events.emit("cache:opportunities:refresh:completed", {
totalOpportunities: opportunities.length,
activitiesRefreshed,
companiesRefreshed,
notesRefreshed,
contactsRefreshed,
productsRefreshed,
oppCwDataRefreshed,
skipped,
});
}
@@ -115,6 +115,24 @@ export const activityCw = {
return activityCw.fetchAll(`opportunity/id=${opportunityId}`);
},
/**
* Fetch Activities by Opportunity (Direct)
*
* Lightweight single-call variant that skips the count request.
* Fetches up to 1000 activities in a single GET — sufficient for
* virtually all opportunities. Used by the background cache refresh
* to avoid doubling CW API calls.
*/
fetchByOpportunityDirect: async (
opportunityId: number,
): Promise<CWActivity[]> => {
const conditions = encodeURIComponent(`opportunity/id=${opportunityId}`);
const response = await connectWiseApi.get(
`/sales/activities?pageSize=1000&conditions=${conditions}`,
);
return response.data;
},
/**
* Create Activity
*
+142
View File
@@ -0,0 +1,142 @@
/**
* @module cwApiLogger
*
* Axios interceptor-based logger that records every ConnectWise API
* request to a JSONL (newline-delimited JSON) file for post-hoc analysis.
*
* Each line in the log file is a self-contained JSON object with:
* - timestamp (ISO-8601)
* - method, url, baseURL
* - status (HTTP status or null on network error)
* - durationMs (wall-clock time from request start → response/error)
* - error (error code / message, if any)
* - timeout (configured timeout in ms)
*
* Logging is **opt-in** — set the `LOG_CW_API` environment variable to
* any truthy value to enable it. When enabled, each process start creates
* a new timestamped file inside the `cw-api-logs/` directory:
*
* LOG_CW_API=1 bun run dev # uses cw-api-logs/<timestamp>.jsonl
* bun run dev:log # shorthand (sets LOG_CW_API=1)
*
* Appends are non-blocking (fire-and-forget) to avoid slowing down
* the actual API flow.
*
* Usage:
* import { attachCwApiLogger } from "./modules/cw-utils/cwApiLogger";
* attachCwApiLogger(connectWiseApi);
*/
import { appendFile, mkdir } from "fs/promises";
import path from "path";
import type { AxiosInstance, InternalAxiosRequestConfig } from "axios";
const LOG_DIR = path.resolve(process.cwd(), "cw-api-logs");
/** Build a timestamped filename like `2026-03-02T14-30-05.123Z.jsonl` */
function buildLogPath(): string {
const ts = new Date().toISOString().replace(/:/g, "-");
return path.join(LOG_DIR, `${ts}.jsonl`);
}
let LOG_PATH: string | null = null;
// Symbol used to stash the start time on the request config
const START_TIME = Symbol("cwLogStartTime");
interface TimedConfig extends InternalAxiosRequestConfig {
[START_TIME]?: number;
}
export interface CwApiLogEntry {
timestamp: string;
method: string;
url: string;
baseURL: string;
status: number | null;
durationMs: number;
error: string | null;
timeout: number | undefined;
}
/** Write a single log entry (fire-and-forget). */
function writeEntry(entry: CwApiLogEntry): void {
if (!LOG_PATH) return;
appendFile(LOG_PATH, JSON.stringify(entry) + "\n").catch((err) => {
// Swallow write errors — logging should never crash the app
console.error("[cw-logger] failed to write log entry:", err.message);
});
}
/**
* Attach request/response interceptors to an Axios instance to log
* every CW API call with timing information.
*/
export function attachCwApiLogger(api: AxiosInstance): void {
if (!process.env.LOG_CW_API) {
return;
}
// Create the log directory and build a unique file path for this run
LOG_PATH = buildLogPath();
mkdir(LOG_DIR, { recursive: true }).catch((err) => {
console.error("[cw-logger] failed to create log directory:", err.message);
});
// ---- Request interceptor: record start time --------------------------
api.interceptors.request.use((config: TimedConfig) => {
config[START_TIME] = performance.now();
return config;
});
// ---- Response interceptor: log successful calls ----------------------
api.interceptors.response.use(
(response) => {
const config = response.config as TimedConfig;
const start = config[START_TIME] ?? performance.now();
const durationMs = Math.round(performance.now() - start);
writeEntry({
timestamp: new Date().toISOString(),
method: (config.method ?? "GET").toUpperCase(),
url: config.url ?? "",
baseURL: config.baseURL ?? "",
status: response.status,
durationMs,
error: null,
timeout: config.timeout,
});
return response;
},
// ---- Error interceptor: log failed calls -----------------------------
(err) => {
const config = (err.config ?? {}) as TimedConfig;
const start = config[START_TIME] ?? performance.now();
const durationMs = Math.round(performance.now() - start);
writeEntry({
timestamp: new Date().toISOString(),
method: (config.method ?? "GET").toUpperCase(),
url: config.url ?? "",
baseURL: config.baseURL ?? "",
status: err.response?.status ?? null,
durationMs,
error: err.code
? `${err.code}: ${err.message}`
: (err.message ?? "unknown"),
timeout: config.timeout,
});
return Promise.reject(err);
},
);
console.log(`[cw-logger] logging CW API calls to ${LOG_PATH}`);
}
/** Returns the current log file path (or null if logging is disabled). */
export function getCwLogPath(): string | null {
return LOG_PATH;
}
+8 -2
View File
@@ -1,12 +1,18 @@
import { connectWiseApi } from "../../constants";
import { Company } from "../../types/ConnectWiseTypes";
import { withCwRetry } from "./withCwRetry";
export const fetchCwCompanyById = async (
companyId: number,
): Promise<Company | null> => {
try {
const response = await connectWiseApi.get(
`/company/companies/${companyId}`,
const response = await withCwRetry(
() => connectWiseApi.get(`/company/companies/${companyId}`),
{
label: `fetchCompany#${companyId}`,
maxAttempts: 3,
baseDelayMs: 1_500,
},
);
return response.data;
} catch (error) {
+90
View File
@@ -0,0 +1,90 @@
/**
* Generic retry wrapper for ConnectWise API calls.
*
* Retries on transient errors (ECONNABORTED, ECONNRESET, ETIMEDOUT,
* ECONNREFUSED, ERR_NETWORK) with exponential backoff. Non-transient
* errors (e.g. 404, 400) are re-thrown immediately.
*/
const TRANSIENT_CODES = new Set([
"ECONNABORTED",
"ECONNRESET",
"ETIMEDOUT",
"ECONNREFUSED",
"ERR_NETWORK",
"ENETUNREACH",
]);
export interface CwRetryOptions {
/** Maximum number of attempts (including the first). Default: 3 */
maxAttempts?: number;
/** Base delay in ms before the first retry. Doubles each retry. Default: 1000 */
baseDelayMs?: number;
/** Optional label for log messages. */
label?: string;
}
/**
* Execute `fn` and retry up to `maxAttempts - 1` times on transient
* Axios / network errors.
*/
export async function withCwRetry<T>(
fn: () => Promise<T>,
opts: CwRetryOptions = {},
): Promise<T> {
const { maxAttempts = 3, baseDelayMs = 1_000, label } = opts;
let lastError: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
if (!isRetryable(err) || attempt === maxAttempts) throw err;
const delay = baseDelayMs * 2 ** (attempt - 1); // 1s, 2s, 4s …
const tag = label ? `[${label}] ` : "";
console.warn(
`${tag}CW transient error (attempt ${attempt}/${maxAttempts}), retrying in ${delay}ms — ${describeErr(err)}`,
);
await sleep(delay);
}
}
// Should never reach here, but satisfy TS:
throw lastError;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function isRetryable(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
if (!e.isAxiosError) return false;
// Retry on known transient codes
if (TRANSIENT_CODES.has(e.code)) return true;
// Also retry on 5xx server errors from CW
const status = e.response?.status;
if (typeof status === "number" && status >= 500) return true;
return false;
}
function describeErr(err: unknown): string {
if (typeof err !== "object" || err === null) return String(err);
const e = err as Record<string, any>;
if (e.isAxiosError) {
const method = (e.config?.method ?? "?").toUpperCase();
const url = e.config?.url ?? "unknown";
return `${method} ${url}${e.code ?? `HTTP ${e.response?.status}`}`;
}
return (e as Error).message ?? String(err);
}
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
+4
View File
@@ -186,6 +186,10 @@ interface EventTypes {
totalOpportunities: number;
activitiesRefreshed: number;
companiesRefreshed: number;
notesRefreshed: number;
contactsRefreshed: number;
productsRefreshed: number;
oppCwDataRefreshed: number;
skipped: number;
}) => void;
"cache:opportunities:refresh:error": (data: { error: unknown }) => void;