perf: cache-only strategy for list views, cache-then-cw for single fetch
- Add data-source hierarchy to opportunity manager (cache-only, cache-then-cw, cw-first) - fetchPages/search/fetchByCompany use cache-only: Redis → DB (no CW calls) - fetchItem uses cache-then-cw by default, cw-first when fresh=true - Add idleTimeout: 255 to Bun.serve to prevent request timeouts - Map CW status 57 (04. Confirmed Quote) to Active equivalency - Add computeCacheTTL algorithm and opportunityCache module
This commit is contained in:
+257
@@ -0,0 +1,257 @@
|
||||
/**
|
||||
* @module opportunityCache
|
||||
*
|
||||
* Redis-backed cache for expensive ConnectWise API data associated
|
||||
* with opportunities.
|
||||
*
|
||||
* ## What is cached
|
||||
*
|
||||
* Each non-closed opportunity may have two cached payloads keyed by
|
||||
* its `cwOpportunityId`:
|
||||
*
|
||||
* - **Activities** (`opp:activities:{cwOpportunityId}`) — the raw
|
||||
* `CWActivity[]` array fetched from `activityCw.fetchByOpportunity()`.
|
||||
* - **Company CW data** (`opp:company-cw:{cw_CompanyId}`) — the hydrated
|
||||
* company / contacts blob set by `CompanyController.hydrateCwData()`.
|
||||
*
|
||||
* TTLs are computed dynamically via {@link computeCacheTTL} so hot
|
||||
* opportunities refresh every 30 s while stale ones live for 15 min.
|
||||
*
|
||||
* ## Background refresh
|
||||
*
|
||||
* {@link refreshOpportunityCache} is designed to be called on a
|
||||
* 30-second interval from `src/index.ts`. It scans all non-closed
|
||||
* DB opportunities, checks which cache keys have expired, and
|
||||
* re-fetches only those from ConnectWise.
|
||||
*/
|
||||
|
||||
import { prisma, redis } from "../../constants";
|
||||
import { activityCw } from "../cw-utils/activities/activities";
|
||||
import { computeCacheTTL } from "../algorithms/computeCacheTTL";
|
||||
import { connectWiseApi } from "../../constants";
|
||||
import { fetchCwCompanyById } from "../cw-utils/fetchCompany";
|
||||
import { events } from "../globalEvents";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Key helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const ACTIVITY_PREFIX = "opp:activities:";
|
||||
const COMPANY_CW_PREFIX = "opp:company-cw:";
|
||||
|
||||
/** Redis key for cached activities by CW opportunity ID. */
|
||||
export const activityCacheKey = (cwOppId: number) =>
|
||||
`${ACTIVITY_PREFIX}${cwOppId}`;
|
||||
|
||||
/** Redis key for cached company CW hydration data by CW company ID. */
|
||||
export const companyCwCacheKey = (cwCompanyId: number) =>
|
||||
`${COMPANY_CW_PREFIX}${cwCompanyId}`;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Read helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Retrieve cached CW activities for an opportunity.
|
||||
*
|
||||
* @returns The parsed `CWActivity[]` or `null` on cache miss.
|
||||
*/
|
||||
export async function getCachedActivities(
|
||||
cwOpportunityId: number,
|
||||
): Promise<any[] | null> {
|
||||
const raw = await redis.get(activityCacheKey(cwOpportunityId));
|
||||
if (!raw) return null;
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve cached company CW hydration data.
|
||||
*
|
||||
* @returns `{ company, defaultContact, allContacts }` or `null` on cache miss.
|
||||
*/
|
||||
export async function getCachedCompanyCwData(
|
||||
cwCompanyId: number,
|
||||
): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> {
|
||||
const raw = await redis.get(companyCwCacheKey(cwCompanyId));
|
||||
if (!raw) return null;
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Write helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Fetch activities from CW and cache them with the appropriate TTL.
|
||||
*
|
||||
* @returns The raw `CWActivity[]` collection (as plain array).
|
||||
*/
|
||||
export async function fetchAndCacheActivities(
|
||||
cwOpportunityId: number,
|
||||
ttlMs: number,
|
||||
): Promise<any[]> {
|
||||
const collection = await activityCw.fetchByOpportunity(cwOpportunityId);
|
||||
const arr = collection.map((item) => item);
|
||||
await redis.set(
|
||||
activityCacheKey(cwOpportunityId),
|
||||
JSON.stringify(arr),
|
||||
"PX",
|
||||
ttlMs,
|
||||
);
|
||||
return arr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch company CW data (company, contacts) and cache with the given TTL.
|
||||
*
|
||||
* @returns The hydration blob or `null` if the company doesn't exist in CW.
|
||||
*/
|
||||
export async function fetchAndCacheCompanyCwData(
|
||||
cwCompanyId: number,
|
||||
ttlMs: number,
|
||||
): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> {
|
||||
const cwCompany = await fetchCwCompanyById(cwCompanyId);
|
||||
if (!cwCompany) return null;
|
||||
|
||||
const contactHref = cwCompany.defaultContact?._info?.contact_href;
|
||||
const defaultContactData = contactHref
|
||||
? await connectWiseApi.get(contactHref)
|
||||
: undefined;
|
||||
|
||||
const allContactsData = await connectWiseApi.get(
|
||||
`${cwCompany._info.contacts_href}&pageSize=1000`,
|
||||
);
|
||||
|
||||
const blob = {
|
||||
company: cwCompany,
|
||||
defaultContact: defaultContactData?.data ?? null,
|
||||
allContacts: allContactsData.data,
|
||||
};
|
||||
|
||||
await redis.set(
|
||||
companyCwCacheKey(cwCompanyId),
|
||||
JSON.stringify(blob),
|
||||
"PX",
|
||||
ttlMs,
|
||||
);
|
||||
|
||||
return blob;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Background refresh
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Refresh the opportunity cache.
|
||||
*
|
||||
* Scans all non-closed opportunities in the database, computes a TTL for each,
|
||||
* checks whether the cache key still exists, and re-fetches from ConnectWise
|
||||
* only for entries that have expired.
|
||||
*
|
||||
* Designed to be called every **30 seconds** from the process entry point.
|
||||
*/
|
||||
export async function refreshOpportunityCache(): Promise<void> {
|
||||
// Include non-closed AND recently-closed (within 30 days) opportunities
|
||||
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const opportunities = await prisma.opportunity.findMany({
|
||||
where: {
|
||||
OR: [
|
||||
{ closedFlag: false },
|
||||
{ closedFlag: true, closedDate: { gte: thirtyDaysAgo } },
|
||||
],
|
||||
},
|
||||
select: {
|
||||
cwOpportunityId: true,
|
||||
closedFlag: true,
|
||||
closedDate: true,
|
||||
expectedCloseDate: true,
|
||||
cwLastUpdated: true,
|
||||
company: { select: { cw_CompanyId: true } },
|
||||
},
|
||||
});
|
||||
|
||||
events.emit("cache:opportunities:refresh:started", {
|
||||
totalOpportunities: opportunities.length,
|
||||
});
|
||||
|
||||
let activitiesRefreshed = 0;
|
||||
let companiesRefreshed = 0;
|
||||
let skipped = 0;
|
||||
|
||||
// Batch-check which activity keys already exist via a pipeline
|
||||
const pipeline = redis.pipeline();
|
||||
for (const opp of opportunities) {
|
||||
pipeline.exists(activityCacheKey(opp.cwOpportunityId));
|
||||
}
|
||||
const existsResults = await pipeline.exec();
|
||||
|
||||
const refreshTasks: Promise<void>[] = [];
|
||||
|
||||
for (let i = 0; i < opportunities.length; i++) {
|
||||
const opp = opportunities[i]!;
|
||||
|
||||
const ttl = computeCacheTTL({
|
||||
closedFlag: opp.closedFlag,
|
||||
closedDate: opp.closedDate,
|
||||
expectedCloseDate: opp.expectedCloseDate,
|
||||
lastUpdated: opp.cwLastUpdated,
|
||||
});
|
||||
|
||||
// Skip closed (ttl === null) — should not happen because of the query filter,
|
||||
// but guard anyway.
|
||||
if (ttl === null) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// existsResults entries are [error, result] tuples
|
||||
const activityExists = existsResults?.[i]?.[1] === 1;
|
||||
|
||||
if (!activityExists) {
|
||||
refreshTasks.push(
|
||||
fetchAndCacheActivities(opp.cwOpportunityId, ttl).then(() => {
|
||||
activitiesRefreshed++;
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Also refresh company CW data if the key is missing
|
||||
if (opp.company?.cw_CompanyId) {
|
||||
const cwCompanyId = opp.company.cw_CompanyId;
|
||||
refreshTasks.push(
|
||||
(async () => {
|
||||
const companyExists = await redis.exists(
|
||||
companyCwCacheKey(cwCompanyId),
|
||||
);
|
||||
if (!companyExists) {
|
||||
await fetchAndCacheCompanyCwData(cwCompanyId, ttl);
|
||||
companiesRefreshed++;
|
||||
}
|
||||
})(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Run all refresh tasks concurrently with bounded concurrency
|
||||
const CONCURRENCY = 10;
|
||||
for (let i = 0; i < refreshTasks.length; i += CONCURRENCY) {
|
||||
await Promise.allSettled(refreshTasks.slice(i, i + CONCURRENCY));
|
||||
}
|
||||
|
||||
events.emit("cache:opportunities:refresh:completed", {
|
||||
totalOpportunities: opportunities.length,
|
||||
activitiesRefreshed,
|
||||
companiesRefreshed,
|
||||
skipped,
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user