Files
optima/src/modules/cache/salesOpportunityMetricsCache.ts
T

901 lines
27 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { prisma, redis } from "../../constants";
import { getCachedOppCwData, getCachedProducts } from "./opportunityCache";
import { OpportunityStatus } from "../../workflows/wf.opportunity";
import { events } from "../globalEvents";
import { opportunities } from "../../managers/opportunities";
import { normalizeProbabilityRatio } from "../sales-utils/normalizeProbability";
const METRICS_CACHE_TTL_MS = 10 * 60 * 1000;
const ALL_MEMBERS_KEY = "sales:metrics:members:all";
const MEMBER_KEY_PREFIX = "sales:metrics:member:";
const OPP_REVENUE_KEY_PREFIX = "sales:metrics:oppRevenue:";
const PRODUCT_FETCH_CONCURRENCY = 6;
const PRODUCT_LOOKUP_TIMEOUT_MS = 35_000;
const LOG_PREFIX = "[cache:salesMetrics]";
const log = (message: string) => {
const ts = new Date().toISOString();
console.log(`${LOG_PREFIX} ${ts} ${message}`);
};
let salesMetricsRefreshInFlight: Promise<void> | null = null;
const memberKey = (identifier: string) =>
`${MEMBER_KEY_PREFIX}${identifier.toLowerCase()}`;
const oppRevenueKey = (cwOpportunityId: number) =>
`${OPP_REVENUE_KEY_PREFIX}${cwOpportunityId}`;
const deleteKeysByPrefix = async (prefix: string) => {
const keys = await redis.keys(`${prefix}*`);
if (keys.length === 0) return 0;
await redis.del(...keys);
return keys.length;
};
export interface OpportunityBreakdownEntry {
id: string;
cwId: number;
name: string;
revenue: number;
taxableRevenue: number;
nonTaxableRevenue: number;
/** Probability as a 0100 percent value */
probability: number;
weightedRevenue: number;
closedDate: string | null;
}
export interface MemberSalesMetrics {
memberIdentifier: string;
memberName: string;
generatedAt: string;
pipelineRevenue: number;
closedWonRevenueMtd: number;
closedWonRevenueYtd: number;
winCount: { mtd: number; ytd: number };
lossCount: { mtd: number; ytd: number };
avgDaysToClose: number;
openOpportunityCount: number;
wonOpportunityCount: { mtd: number; ytd: number };
lostOpportunityCount: { mtd: number; ytd: number };
closedOpportunityCount: { mtd: number; ytd: number };
weightedPipelineRevenue: number;
taxablePipelineRevenue: number;
nonTaxablePipelineRevenue: number;
avgOpenDealSize: number;
avgWonDealSize: { mtd: number; ytd: number };
winRate: { mtd: number; ytd: number };
lossRate: { mtd: number; ytd: number };
assignedOpportunityCount: number;
cacheHitCount: number;
cacheMissCount: number;
cacheHitRate: number;
opportunityBreakdown: {
pipeline: OpportunityBreakdownEntry[];
closedWonMtd: OpportunityBreakdownEntry[];
closedWonYtd: OpportunityBreakdownEntry[];
closedLostMtd: OpportunityBreakdownEntry[];
closedLostYtd: OpportunityBreakdownEntry[];
};
}
export interface SalesMetricsCacheEnvelope {
generatedAt: string;
activeMemberCount: number;
memberIdentifiers: string[];
members: Record<string, MemberSalesMetrics>;
}
interface OpportunityRevenue {
totalRevenue: number;
taxableRevenue: number;
nonTaxableRevenue: number;
cacheHit: boolean;
}
interface CachedOpportunityRevenue {
totalRevenue: number;
taxableRevenue: number;
nonTaxableRevenue: number;
}
interface OpportunityRow {
id: string;
cwOpportunityId: number;
name: string;
primarySalesRepIdentifier: string | null;
secondarySalesRepIdentifier: string | null;
statusCwId: number | null;
statusName: string | null;
closedFlag: boolean;
dateBecameLead: Date | null;
closedDate: Date | null;
probability: number;
}
interface RefreshSalesOpportunityMetricsCacheOptions {
forceColdLoad?: boolean;
}
const roundCurrency = (value: number) => Math.round(value * 100) / 100;
const daysBetween = (start: Date, end: Date): number => {
const msPerDay = 1000 * 60 * 60 * 24;
return Math.max(0, (end.getTime() - start.getTime()) / msPerDay);
};
const startOfMonthUtc = (input: Date): Date =>
new Date(Date.UTC(input.getUTCFullYear(), input.getUTCMonth(), 1, 0, 0, 0));
const startOfYearUtc = (input: Date): Date =>
new Date(Date.UTC(input.getUTCFullYear(), 0, 1, 0, 0, 0));
const toFinite = (value: unknown): number => {
const n = Number(value);
if (!Number.isFinite(n)) return 0;
return n;
};
const isWon = (opp: {
statusCwId: number | null;
statusName: string | null;
closedFlag: boolean;
}) => {
if (opp.statusCwId === OpportunityStatus.Won) return true;
if (opp.statusName?.toLowerCase().includes("won")) return true;
if (opp.closedFlag && opp.statusName?.toLowerCase().includes("won"))
return true;
return false;
};
const isLost = (opp: {
statusCwId: number | null;
statusName: string | null;
closedFlag: boolean;
}) => {
if (opp.statusCwId === OpportunityStatus.Lost) return true;
if (opp.statusName?.toLowerCase().includes("lost")) return true;
if (opp.closedFlag && opp.statusName?.toLowerCase().includes("lost"))
return true;
return false;
};
const isClosedOpportunity = (opp: {
statusCwId: number | null;
statusName: string | null;
closedFlag: boolean;
}) => {
if (opp.closedFlag) return true;
if (isWon(opp)) return true;
if (isLost(opp)) return true;
return false;
};
const buildCancellationMap = (procProducts: any[]) => {
const map = new Map<number, any>();
for (const pp of procProducts) {
const rawForecastDetailId = pp?.forecastDetailId;
const forecastDetailId =
typeof rawForecastDetailId === "number"
? rawForecastDetailId
: Number(rawForecastDetailId);
if (Number.isFinite(forecastDetailId) && forecastDetailId > 0) {
map.set(forecastDetailId, pp);
}
}
return map;
};
const computeRevenueFromProductsBlob = (
blob: any,
): Omit<OpportunityRevenue, "cacheHit"> => {
const forecastItems = Array.isArray(blob?.forecast?.forecastItems)
? blob.forecast.forecastItems
: [];
const procProducts = Array.isArray(blob?.procProducts)
? blob.procProducts
: [];
const cancellationMap = buildCancellationMap(procProducts);
let totalRevenue = 0;
let taxableRevenue = 0;
for (const item of forecastItems) {
if (!cancellationMap.has(item?.id)) continue;
if (!item?.includeFlag) continue;
const quantity = Math.max(0, toFinite(item?.quantity));
const revenue = toFinite(item?.revenue);
const cancellation = cancellationMap.get(item.id);
const cancelledFlag = Boolean(cancellation?.cancelledFlag);
const quantityCancelled = Math.max(
0,
toFinite(cancellation?.quantityCancelled),
);
if (cancelledFlag && quantity > 0 && quantityCancelled >= quantity)
continue;
const ratio =
quantity > 0 ? Math.max(0, (quantity - quantityCancelled) / quantity) : 1;
const effectiveRevenue = revenue * ratio;
totalRevenue += effectiveRevenue;
if (item?.taxableFlag) taxableRevenue += effectiveRevenue;
}
const nonTaxableRevenue = totalRevenue - taxableRevenue;
return {
totalRevenue: roundCurrency(totalRevenue),
taxableRevenue: roundCurrency(taxableRevenue),
nonTaxableRevenue: roundCurrency(nonTaxableRevenue),
};
};
const computeRevenueFromControllers = (
products: Array<{
includeFlag: boolean;
taxableFlag: boolean;
cancellationType: "full" | "partial" | null;
effectiveRevenue: number;
}>,
): Omit<OpportunityRevenue, "cacheHit"> => {
let totalRevenue = 0;
let taxableRevenue = 0;
for (const item of products) {
if (!item.includeFlag) continue;
if (item.cancellationType === "full") continue;
const effectiveRevenue = Math.max(0, toFinite(item.effectiveRevenue));
totalRevenue += effectiveRevenue;
if (item.taxableFlag) taxableRevenue += effectiveRevenue;
}
const nonTaxableRevenue = totalRevenue - taxableRevenue;
return {
totalRevenue: roundCurrency(totalRevenue),
taxableRevenue: roundCurrency(taxableRevenue),
nonTaxableRevenue: roundCurrency(nonTaxableRevenue),
};
};
const readCachedOpportunityRevenue = async (
cwOpportunityId: number,
): Promise<CachedOpportunityRevenue | null> => {
const raw = await redis.get(oppRevenueKey(cwOpportunityId));
if (!raw) return null;
try {
const parsed = JSON.parse(raw) as CachedOpportunityRevenue;
return {
totalRevenue: toFinite(parsed.totalRevenue),
taxableRevenue: toFinite(parsed.taxableRevenue),
nonTaxableRevenue: toFinite(parsed.nonTaxableRevenue),
};
} catch {
return null;
}
};
const writeCachedOpportunityRevenue = async (
cwOpportunityId: number,
revenue: Omit<OpportunityRevenue, "cacheHit">,
) => {
await redis.set(
oppRevenueKey(cwOpportunityId),
JSON.stringify(revenue),
"PX",
METRICS_CACHE_TTL_MS,
);
};
const resolveProbabilityRatio = async (opp: {
cwOpportunityId: number;
probability: number;
}): Promise<number> => {
const fromDb = normalizeProbabilityRatio(opp.probability);
if (fromDb > 0) return fromDb;
const cachedCwOpp = await getCachedOppCwData(opp.cwOpportunityId);
if (!cachedCwOpp) return 0;
const rawProbability =
cachedCwOpp?.probability?.name ?? cachedCwOpp?.probability ?? 0;
return normalizeProbabilityRatio(rawProbability);
};
const getOpportunityRevenueCacheFirst = async (
cwOpportunityId: number,
opts?: RefreshSalesOpportunityMetricsCacheOptions,
): Promise<OpportunityRevenue> => {
if (!opts?.forceColdLoad) {
const cachedRevenue = await readCachedOpportunityRevenue(cwOpportunityId);
if (cachedRevenue) {
return {
...cachedRevenue,
cacheHit: true,
};
}
}
if (!opts?.forceColdLoad) {
const cachedProducts = await getCachedProducts(cwOpportunityId);
if (cachedProducts) {
const computed = computeRevenueFromProductsBlob(cachedProducts);
await writeCachedOpportunityRevenue(cwOpportunityId, computed);
return {
...computed,
cacheHit: true,
};
}
}
try {
const opportunity = await opportunities.fetchRecord(cwOpportunityId);
const products = await opportunity.fetchProducts({
fresh: opts?.forceColdLoad,
});
const computed = computeRevenueFromControllers(products);
await writeCachedOpportunityRevenue(cwOpportunityId, computed);
return {
...computed,
cacheHit: false,
};
} catch {
return {
totalRevenue: 0,
taxableRevenue: 0,
nonTaxableRevenue: 0,
cacheHit: false,
};
}
};
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
): Promise<T> => {
return Promise.race([
promise,
new Promise<T>((_, reject) => {
setTimeout(() => reject(new Error("Timeout")), timeoutMs);
}),
]);
};
async function mapWithConcurrency<T, R>(
items: T[],
concurrency: number,
mapper: (item: T) => Promise<R>,
): Promise<R[]> {
const results: R[] = new Array(items.length);
let index = 0;
const worker = async () => {
while (true) {
const current = index;
index += 1;
if (current >= items.length) return;
results[current] = await mapper(items[current]!);
}
};
const workers = Array.from(
{ length: Math.min(concurrency, items.length) },
() => worker(),
);
await Promise.all(workers);
return results;
}
const buildEmptyMetrics = (
memberIdentifier: string,
memberName: string,
generatedAt: string,
): MemberSalesMetrics => ({
memberIdentifier,
memberName,
generatedAt,
pipelineRevenue: 0,
closedWonRevenueMtd: 0,
closedWonRevenueYtd: 0,
winCount: { mtd: 0, ytd: 0 },
lossCount: { mtd: 0, ytd: 0 },
avgDaysToClose: 0,
openOpportunityCount: 0,
wonOpportunityCount: { mtd: 0, ytd: 0 },
lostOpportunityCount: { mtd: 0, ytd: 0 },
closedOpportunityCount: { mtd: 0, ytd: 0 },
weightedPipelineRevenue: 0,
taxablePipelineRevenue: 0,
nonTaxablePipelineRevenue: 0,
avgOpenDealSize: 0,
avgWonDealSize: { mtd: 0, ytd: 0 },
winRate: { mtd: 0, ytd: 0 },
lossRate: { mtd: 0, ytd: 0 },
assignedOpportunityCount: 0,
cacheHitCount: 0,
cacheMissCount: 0,
cacheHitRate: 0,
opportunityBreakdown: {
pipeline: [],
closedWonMtd: [],
closedWonYtd: [],
closedLostMtd: [],
closedLostYtd: [],
},
});
export async function refreshSalesOpportunityMetricsCache(
opts?: RefreshSalesOpportunityMetricsCacheOptions,
): Promise<void> {
if (salesMetricsRefreshInFlight) {
log(
"refresh requested while previous run is still in-flight; reusing existing run",
);
return salesMetricsRefreshInFlight;
}
salesMetricsRefreshInFlight = (async () => {
const startedAt = Date.now();
const forceColdLoad = opts?.forceColdLoad === true;
log(`refresh started${forceColdLoad ? " | mode=cold" : " | mode=warm"}`);
if (forceColdLoad) {
const [deletedMemberKeys, deletedRevenueKeys] = await Promise.all([
deleteKeysByPrefix(MEMBER_KEY_PREFIX),
deleteKeysByPrefix(OPP_REVENUE_KEY_PREFIX),
redis.del(ALL_MEMBERS_KEY),
]);
log(
`cold-load reset completed: memberKeysCleared=${deletedMemberKeys} oppRevenueKeysCleared=${deletedRevenueKeys}`,
);
}
const now = new Date();
const generatedAt = now.toISOString();
const monthStart = startOfMonthUtc(now);
const yearStart = startOfYearUtc(now);
try {
const activeMembers = await prisma.cwMember.findMany({
where: { inactiveFlag: false },
select: {
identifier: true,
firstName: true,
lastName: true,
},
});
const memberIdentifiers = activeMembers.map(
(member) => member.identifier,
);
log(`members fetched: activeMembers=${memberIdentifiers.length}`);
const opportunityRows: OpportunityRow[] =
await prisma.opportunity.findMany({
where: {
AND: [
{
OR: [
{ primarySalesRepIdentifier: { in: memberIdentifiers } },
{ secondarySalesRepIdentifier: { in: memberIdentifiers } },
],
},
{ dateBecameLead: { gte: yearStart } },
{
OR: [{ closedFlag: false }, { closedDate: { gte: yearStart } }],
},
],
},
select: {
id: true,
cwOpportunityId: true,
name: true,
primarySalesRepIdentifier: true,
secondarySalesRepIdentifier: true,
statusCwId: true,
statusName: true,
closedFlag: true,
dateBecameLead: true,
closedDate: true,
probability: true,
},
});
log(
`opportunities fetched: assignedOpportunityRows=${opportunityRows.length}`,
);
events.emit("cache:salesMetrics:refresh:started", {
activeMemberCount: memberIdentifiers.length,
opportunityCount: opportunityRows.length,
});
if (memberIdentifiers.length === 0) {
const emptyEnvelope: SalesMetricsCacheEnvelope = {
generatedAt,
activeMemberCount: 0,
memberIdentifiers: [],
members: {},
};
await redis.set(
ALL_MEMBERS_KEY,
JSON.stringify(emptyEnvelope),
"PX",
METRICS_CACHE_TTL_MS,
);
events.emit("cache:salesMetrics:refresh:completed", {
activeMemberCount: 0,
opportunityCount: 0,
memberMetricsWritten: 0,
cacheHitCount: 0,
cacheMissCount: 0,
durationMs: Date.now() - startedAt,
});
log("no active members found; wrote empty cache envelope");
return;
}
const revenuePhaseStartedAt = Date.now();
let revenueLookupProcessed = 0;
let revenueLookupTimeouts = 0;
let revenueLookupFailures = 0;
let revenueLookupCacheHits = 0;
let revenueLookupCacheMisses = 0;
log(
`revenue lookup phase started: concurrency=${PRODUCT_FETCH_CONCURRENCY} timeoutMs=${PRODUCT_LOOKUP_TIMEOUT_MS}`,
);
const revenueRows = await mapWithConcurrency(
opportunityRows,
PRODUCT_FETCH_CONCURRENCY,
async (opp) => {
const [revenue, probabilityRatio] = await Promise.all([
withTimeout(
getOpportunityRevenueCacheFirst(opp.cwOpportunityId, {
forceColdLoad,
}),
PRODUCT_LOOKUP_TIMEOUT_MS,
).catch((err: any) => {
if (err?.message === "Timeout") {
revenueLookupTimeouts += 1;
}
if (err?.message !== "Timeout") {
revenueLookupFailures += 1;
}
return {
totalRevenue: 0,
taxableRevenue: 0,
nonTaxableRevenue: 0,
cacheHit: false,
};
}),
resolveProbabilityRatio(opp),
]);
revenueLookupProcessed += 1;
if (revenue.cacheHit) revenueLookupCacheHits += 1;
if (!revenue.cacheHit) revenueLookupCacheMisses += 1;
if (revenueLookupProcessed % 100 === 0) {
log(
`revenue lookup progress: processed=${revenueLookupProcessed}/${opportunityRows.length} cacheHits=${revenueLookupCacheHits} cacheMisses=${revenueLookupCacheMisses} timeouts=${revenueLookupTimeouts} failures=${revenueLookupFailures}`,
);
}
return { oppId: opp.id, revenue, probabilityRatio };
},
);
log(
`revenue lookup phase completed in ${Date.now() - revenuePhaseStartedAt}ms: processed=${revenueLookupProcessed}/${opportunityRows.length} cacheHits=${revenueLookupCacheHits} cacheMisses=${revenueLookupCacheMisses} timeouts=${revenueLookupTimeouts} failures=${revenueLookupFailures}`,
);
const revenueByOppId = new Map(
revenueRows.map((row) => [row.oppId, row.revenue]),
);
const probabilityByOppId = new Map(
revenueRows.map((row) => [row.oppId, row.probabilityRatio]),
);
const opportunitiesByMember = new Map<string, OpportunityRow[]>();
for (const identifier of memberIdentifiers) {
opportunitiesByMember.set(identifier, []);
}
for (const opp of opportunityRows) {
const assigned = new Set<string>();
if (opp.primarySalesRepIdentifier)
assigned.add(opp.primarySalesRepIdentifier);
if (opp.secondarySalesRepIdentifier)
assigned.add(opp.secondarySalesRepIdentifier);
for (const identifier of assigned) {
const bucket = opportunitiesByMember.get(identifier);
if (!bucket) continue;
bucket.push(opp);
}
}
const members: Record<string, MemberSalesMetrics> = {};
log("member aggregation phase started");
for (const member of activeMembers) {
const identifier = member.identifier;
const assigned = opportunitiesByMember.get(identifier) ?? [];
const metric = buildEmptyMetrics(
identifier,
`${member.firstName} ${member.lastName}`.trim() || identifier,
generatedAt,
);
let wonDaysSumYtd = 0;
for (const opp of assigned) {
const revenue = revenueByOppId.get(opp.id) ?? {
totalRevenue: 0,
taxableRevenue: 0,
nonTaxableRevenue: 0,
cacheHit: false,
};
metric.cacheHitCount += revenue.cacheHit ? 1 : 0;
metric.cacheMissCount += revenue.cacheHit ? 0 : 1;
const won = isWon(opp);
const lost = isLost(opp);
const closed = isClosedOpportunity(opp);
const probabilityRatio = Math.max(
0,
Math.min(1, toFinite(probabilityByOppId.get(opp.id))),
);
const breakdownEntry: OpportunityBreakdownEntry = {
id: opp.id,
cwId: opp.cwOpportunityId,
name: opp.name,
revenue: revenue.totalRevenue,
taxableRevenue: revenue.taxableRevenue,
nonTaxableRevenue: revenue.nonTaxableRevenue,
probability: roundCurrency(probabilityRatio * 100),
weightedRevenue: roundCurrency(
revenue.totalRevenue * probabilityRatio,
),
closedDate: opp.closedDate?.toISOString() ?? null,
};
if (!closed) {
metric.openOpportunityCount += 1;
metric.pipelineRevenue += revenue.totalRevenue;
metric.taxablePipelineRevenue += revenue.taxableRevenue;
metric.nonTaxablePipelineRevenue += revenue.nonTaxableRevenue;
metric.weightedPipelineRevenue +=
revenue.totalRevenue * probabilityRatio;
metric.opportunityBreakdown.pipeline.push(breakdownEntry);
}
const closedDate = opp.closedDate;
if (!closedDate) continue;
const isMtd = closedDate >= monthStart;
const isYtd = closedDate >= yearStart;
if (won) {
if (isMtd) {
metric.winCount.mtd += 1;
metric.wonOpportunityCount.mtd += 1;
metric.closedOpportunityCount.mtd += 1;
metric.closedWonRevenueMtd += revenue.totalRevenue;
metric.opportunityBreakdown.closedWonMtd.push(breakdownEntry);
}
if (isYtd) {
metric.winCount.ytd += 1;
metric.wonOpportunityCount.ytd += 1;
metric.closedOpportunityCount.ytd += 1;
metric.closedWonRevenueYtd += revenue.totalRevenue;
wonDaysSumYtd += daysBetween(
opp.dateBecameLead ?? closedDate,
closedDate,
);
metric.opportunityBreakdown.closedWonYtd.push(breakdownEntry);
}
}
if (!lost) continue;
if (isMtd) {
metric.lossCount.mtd += 1;
metric.lostOpportunityCount.mtd += 1;
metric.closedOpportunityCount.mtd += 1;
metric.opportunityBreakdown.closedLostMtd.push(breakdownEntry);
}
if (!isYtd) continue;
metric.lossCount.ytd += 1;
metric.lostOpportunityCount.ytd += 1;
metric.closedOpportunityCount.ytd += 1;
metric.opportunityBreakdown.closedLostYtd.push(breakdownEntry);
}
metric.assignedOpportunityCount = assigned.length;
metric.avgDaysToClose =
metric.winCount.ytd > 0 ? wonDaysSumYtd / metric.winCount.ytd : 0;
metric.avgOpenDealSize =
metric.openOpportunityCount > 0
? metric.pipelineRevenue / metric.openOpportunityCount
: 0;
metric.avgWonDealSize.mtd =
metric.winCount.mtd > 0
? metric.closedWonRevenueMtd / metric.winCount.mtd
: 0;
metric.avgWonDealSize.ytd =
metric.winCount.ytd > 0
? metric.closedWonRevenueYtd / metric.winCount.ytd
: 0;
const closedMtd = metric.winCount.mtd + metric.lossCount.mtd;
const closedYtd = metric.winCount.ytd + metric.lossCount.ytd;
metric.winRate.mtd =
closedMtd > 0 ? metric.winCount.mtd / closedMtd : 0;
metric.winRate.ytd =
closedYtd > 0 ? metric.winCount.ytd / closedYtd : 0;
metric.lossRate.mtd =
closedMtd > 0 ? metric.lossCount.mtd / closedMtd : 0;
metric.lossRate.ytd =
closedYtd > 0 ? metric.lossCount.ytd / closedYtd : 0;
const totalLookups = metric.cacheHitCount + metric.cacheMissCount;
metric.cacheHitRate =
totalLookups > 0 ? metric.cacheHitCount / totalLookups : 0;
metric.pipelineRevenue = roundCurrency(metric.pipelineRevenue);
metric.closedWonRevenueMtd = roundCurrency(metric.closedWonRevenueMtd);
metric.closedWonRevenueYtd = roundCurrency(metric.closedWonRevenueYtd);
metric.weightedPipelineRevenue = roundCurrency(
metric.weightedPipelineRevenue,
);
metric.taxablePipelineRevenue = roundCurrency(
metric.taxablePipelineRevenue,
);
metric.nonTaxablePipelineRevenue = roundCurrency(
metric.nonTaxablePipelineRevenue,
);
metric.avgDaysToClose = roundCurrency(metric.avgDaysToClose);
metric.avgOpenDealSize = roundCurrency(metric.avgOpenDealSize);
metric.avgWonDealSize.mtd = roundCurrency(metric.avgWonDealSize.mtd);
metric.avgWonDealSize.ytd = roundCurrency(metric.avgWonDealSize.ytd);
metric.winRate.mtd = roundCurrency(metric.winRate.mtd);
metric.winRate.ytd = roundCurrency(metric.winRate.ytd);
metric.lossRate.mtd = roundCurrency(metric.lossRate.mtd);
metric.lossRate.ytd = roundCurrency(metric.lossRate.ytd);
metric.cacheHitRate = roundCurrency(metric.cacheHitRate);
members[identifier] = metric;
if (Object.keys(members).length % 25 === 0) {
log(
`member aggregation progress: aggregated=${Object.keys(members).length}/${activeMembers.length}`,
);
}
}
log(
`member aggregation completed: totalMembers=${Object.keys(members).length}`,
);
const envelope: SalesMetricsCacheEnvelope = {
generatedAt,
activeMemberCount: memberIdentifiers.length,
memberIdentifiers,
members,
};
const pipeline = redis.pipeline();
log("redis write phase started");
pipeline.set(
ALL_MEMBERS_KEY,
JSON.stringify(envelope),
"PX",
METRICS_CACHE_TTL_MS,
);
for (const identifier of Object.keys(members)) {
pipeline.set(
memberKey(identifier),
JSON.stringify(members[identifier]),
"PX",
METRICS_CACHE_TTL_MS,
);
}
await pipeline.exec();
log("redis write phase completed");
const cacheHitCount = Object.values(members).reduce(
(sum, metric) => sum + metric.cacheHitCount,
0,
);
const cacheMissCount = Object.values(members).reduce(
(sum, metric) => sum + metric.cacheMissCount,
0,
);
events.emit("cache:salesMetrics:refresh:completed", {
activeMemberCount: memberIdentifiers.length,
opportunityCount: opportunityRows.length,
memberMetricsWritten: Object.keys(members).length,
cacheHitCount,
cacheMissCount,
durationMs: Date.now() - startedAt,
});
log(
`completed in ${Date.now() - startedAt}ms | activeMembers=${memberIdentifiers.length} opportunities=${opportunityRows.length} memberMetrics=${Object.keys(members).length} cacheHits=${cacheHitCount} cacheMisses=${cacheMissCount}`,
);
} catch (error) {
log(`refresh failed in ${Date.now() - startedAt}ms: ${String(error)}`);
events.emit("cache:salesMetrics:refresh:error", {
error,
durationMs: Date.now() - startedAt,
});
throw error;
}
})().finally(() => {
salesMetricsRefreshInFlight = null;
});
return salesMetricsRefreshInFlight;
}
export async function getSalesOpportunityMetricsAll(): Promise<SalesMetricsCacheEnvelope | null> {
const raw = await redis.get(ALL_MEMBERS_KEY);
if (!raw) return null;
try {
return JSON.parse(raw) as SalesMetricsCacheEnvelope;
} catch {
return null;
}
}
export async function getSalesOpportunityMetricsForMember(
identifier: string,
): Promise<MemberSalesMetrics | null> {
const normalized = identifier.trim().toLowerCase();
if (!normalized) return null;
const raw = await redis.get(memberKey(normalized));
if (raw) {
try {
return JSON.parse(raw) as MemberSalesMetrics;
} catch {
return null;
}
}
const all = await getSalesOpportunityMetricsAll();
if (!all) return null;
return all.members[normalized] ?? null;
}