feat: Redis opportunity cache, CW API retry/logging, adaptive TTLs

- Add Redis-backed opportunity cache with background refresh (30s interval)
- Fix concurrency bug: use lazy thunks instead of eager promises for batching
- Add withCwRetry utility with exponential backoff for transient CW errors
- Add adaptive TTL algorithms (primary, sub-resource, products) based on opportunity activity
- Add include query param on GET /sales/opportunities/:id (notes,contacts,products)
- Add opt-in CW API logger (LOG_CW_API env var) with timestamped files in cw-api-logs/
- Add debug-scripts/analyze-cw-calls.py for API call analysis
- Add computeSubResourceCacheTTL and computeProductsCacheTTL algorithms with tests
- Increase CW API timeout from 15s to 30s
- Unblock cache refresh from startup chain (remove await)
- Prioritize recently updated opportunities in refresh cycle
- Add CACHING.md documentation
- Update API_ROUTES.md with caching details and include param
- Update copilot instructions to require CACHING.md sync
- Add dev:log script for CW API call logging during development
This commit is contained in:
2026-03-02 23:23:24 -06:00
parent fe71248e88
commit e8a6a6da06
33 changed files with 2634 additions and 176 deletions
+5 -1
View File
@@ -198,7 +198,11 @@ Whenever you add, remove, or modify API routes or permission nodes, you **must**
2. `PERMISSIONS.md` — human-readable documentation of all permission nodes; must strictly reflect the data in `PermissionNodes.ts`.
3. `API_ROUTES.md` — comprehensive documentation of all API routes, including method, path, auth requirements, permissions, request/response examples.
Always verify that new routes have their required permissions listed in `PermissionNodes.ts`, that `PERMISSIONS.md` tables match the TS file exactly, and that `API_ROUTES.md` includes full documentation for every mounted route. Run through all three files at the end of any route or permission change to catch discrepancies.
Additionally, whenever you add, remove, or modify **caching logic** (TTL algorithms, cache key patterns, background refresh mechanics, retry settings, or invalidation behavior), you **must** update:
4. `CACHING.md` — comprehensive documentation of the Redis-backed opportunity cache, TTL algorithms, background refresh mechanics, retry logic, and debugging tools.
Always verify that new routes have their required permissions listed in `PermissionNodes.ts`, that `PERMISSIONS.md` tables match the TS file exactly, that `API_ROUTES.md` includes full documentation for every mounted route, and that `CACHING.md` accurately reflects any caching changes. Run through all relevant files at the end of any route, permission, or caching change to catch discrepancies.
---
+2
View File
@@ -1,6 +1,8 @@
# Logs
logs
*.log
*.jsonl
cw-api-logs/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
+11 -5
View File
@@ -2704,7 +2704,9 @@ Fetch the distinct values available for filter dropdowns (categories, subcategor
## Sales Routes
Sales routes serve opportunity data stored locally and synced from ConnectWise. All opportunity responses include hydrated company data (address, contacts) fetched from ConnectWise when a linked company exists, as well as an `activities` array containing all ConnectWise activities linked to the opportunity (fetched live from CW at request time). Single-opportunity fetches additionally include full site details (address, phone, flags). Sub-resource routes (products, notes, contacts) fetch live data from ConnectWise using the opportunity's CW ID.
Sales routes serve opportunity data stored locally and synced from ConnectWise. All opportunity responses include hydrated company data (address, contacts) and an `activities` array. Single-opportunity fetches additionally include full site details (address, phone, flags). Sub-resource routes (products, notes, contacts) return data keyed by the opportunity's CW ID.
**Caching:** Most CW data for opportunities is served from a **Redis cache** that is proactively warmed by a background refresh cycle (every 30 seconds). This includes opportunity CW data, activities, notes, contacts, products, and company data. Cache TTLs are adaptive — recently updated opportunities have shorter TTLs (30s60s) for fresher data, while inactive opportunities use longer TTLs (515 minutes). If a cache miss occurs at request time, data is fetched live from CW and cached. See [CACHING.md](CACHING.md) for full details on TTL algorithms, background refresh mechanics, and debugging tools.
### Get Opportunity Types
@@ -2922,7 +2924,7 @@ Get the total number of opportunities.
**GET** `/sales/opportunities/:identifier`
Fetch a single opportunity by its internal ID or ConnectWise opportunity ID. The response includes hydrated company data (with address and contacts from ConnectWise) and full site details (with address) when available.
Fetch a single opportunity by its internal ID or ConnectWise opportunity ID. The response includes hydrated company data (with address and contacts) and full site details (with address) when available. CW data (activities, company, site) is served from the Redis cache when available; on cache miss, data is fetched live from CW and cached with an adaptive TTL.
**Authentication Required:** Yes
@@ -2934,6 +2936,10 @@ Fetch a single opportunity by its internal ID or ConnectWise opportunity ID. The
- `identifier` — Internal ID (cuid) or ConnectWise opportunity ID (numeric)
**Query Parameters:**
- `include` _(optional)_ — Comma-separated list of sub-resources to embed in the response. Supported values: `notes`, `contacts`, `products`. Example: `?include=notes,contacts,products`. Sub-resources are fetched in parallel and added as top-level keys on the response object.
**Response:**
```json
@@ -3182,7 +3188,7 @@ Refresh an opportunity's local data by fetching the latest from ConnectWise. The
**GET** `/sales/opportunities/:identifier/products`
Fetch products (forecast/revenue line items) for an opportunity. Data is fetched live from ConnectWise using the opportunity's CW ID. Products are returned sorted by the opportunity's local `productSequence` array when set; otherwise, items are sorted by their ConnectWise `sequenceNumber`.
Fetch products (forecast/revenue line items) for an opportunity. Data is served from the Redis cache when available; on cache miss, data is fetched live from ConnectWise and cached. Hot opportunities (updated within 3 days) have a 15-second TTL; others use a 30-minute lazy TTL. Products are returned sorted by the opportunity's local `productSequence` array when set; otherwise, items are sorted by their ConnectWise `sequenceNumber`.
**Authentication Required:** Yes
@@ -3438,7 +3444,7 @@ All fields are optional. Only fields the user has the corresponding `sales.oppor
**GET** `/sales/opportunities/:identifier/notes`
Fetch notes for an opportunity. Data is fetched live from ConnectWise using the opportunity's CW ID.
Fetch notes for an opportunity. Data is served from the Redis cache when available; on cache miss, data is fetched live from ConnectWise and cached. Cache is invalidated automatically when notes are created, updated, or deleted.
**Authentication Required:** Yes
@@ -3651,7 +3657,7 @@ Delete a note from an opportunity in ConnectWise.
**GET** `/sales/opportunities/:identifier/contacts`
Fetch contacts associated with an opportunity. Data is fetched live from ConnectWise using the opportunity's CW ID.
Fetch contacts associated with an opportunity. Data is served from the Redis cache when available; on cache miss, data is fetched live from ConnectWise and cached. Cache is invalidated automatically when contacts are created, updated, or deleted.
**Authentication Required:** Yes
+297
View File
@@ -0,0 +1,297 @@
# Caching Architecture
This document describes the caching layer used in the Optima API, covering the Redis-backed opportunity cache, TTL algorithms, background refresh mechanics, retry logic, and debugging tools.
---
## Overview
The API caches expensive ConnectWise (CW) API responses in **Redis** to reduce latency and avoid CW rate limits. The primary cache layer is the **opportunity cache** (`src/modules/cache/opportunityCache.ts`), which proactively warms data for all non-closed opportunities on a background interval.
### Key design principles
- **Adaptive TTLs** — cache durations are computed dynamically based on how "hot" an opportunity is (recently updated = shorter TTL = fresher data).
- **Background refresh** — a 30-second interval scans all open opportunities and re-fetches only expired cache keys.
- **Bounded concurrency** — CW API calls are throttled via thunk-based batching to prevent overwhelming the upstream API.
- **Graceful degradation** — transient CW errors (timeouts, network failures) are caught, logged, and retried on the next cycle rather than crashing the process.
- **Priority ordering** — most recently updated opportunities are refreshed first so active deals get fresh data before stale ones.
---
## What is cached
Each non-closed opportunity can have up to 7 cached payloads in Redis:
| Cache Key Pattern | Data | Source |
|---|---|---|
| `opp:cw-data:{cwOpportunityId}` | Raw CW opportunity response | `GET /sales/opportunities/:id` |
| `opp:activities:{cwOpportunityId}` | CW activities array | `GET /sales/activities?conditions=opportunity/id=:id` |
| `opp:notes:{cwOpportunityId}` | CW notes array | `GET /sales/opportunities/:id/notes` |
| `opp:contacts:{cwOpportunityId}` | CW contacts array | `GET /sales/opportunities/:id/contacts` |
| `opp:products:{cwOpportunityId}` | Forecast + procurement products blob | `GET /sales/opportunities/:id/forecast` + `GET /procurement/products` |
| `opp:company-cw:{cw_CompanyId}` | Hydrated company + contacts blob | `GET /company/companies/:id` + contacts endpoints |
| `opp:site:{cwCompanyId}:{cwSiteId}` | Company site data | `GET /company/companies/:id/sites/:siteId` |
---
## TTL Algorithms
Three algorithms compute cache TTLs. All share the same input signals:
- `closedFlag` — whether the opportunity is closed
- `closedDate` — when it was closed
- `expectedCloseDate` — projected close date (forward-looking signal)
- `lastUpdated` — last CW modification date (backward-looking signal)
### Primary TTL (`computeCacheTTL`)
**File:** `src/modules/algorithms/computeCacheTTL.ts`
Used for: opportunity CW data, activities, company CW data.
| # | Condition | TTL | Human |
|---|---|---|---|
| 1a | Closed > 30 days ago | `null` | Do not cache |
| 1b | Closed within 30 days | 900,000 ms | 15 minutes |
| 2 | `expectedCloseDate` or `lastUpdated` within **5 days** | 30,000 ms | 30 seconds |
| 3 | `expectedCloseDate` or `lastUpdated` within **14 days** | 60,000 ms | 60 seconds |
| 4 | Everything else | 900,000 ms | 15 minutes |
Rules are evaluated top-to-bottom; first match wins.
### Sub-Resource TTL (`computeSubResourceCacheTTL`)
**File:** `src/modules/algorithms/computeSubResourceCacheTTL.ts`
Used for: notes, contacts.
| # | Condition | TTL | Human |
|---|---|---|---|
| 1a | Closed > 30 days ago | `null` | Do not cache |
| 1b | Closed within 30 days | 300,000 ms | 5 minutes |
| 2 | Within **5 days** | 60,000 ms | 60 seconds |
| 3 | Within **14 days** | 120,000 ms | 2 minutes |
| 4 | Everything else | 300,000 ms | 5 minutes |
### Products TTL (`computeProductsCacheTTL`)
**File:** `src/modules/algorithms/computeProductsCacheTTL.ts`
Used for: forecast + procurement products.
| # | Condition | TTL | Human |
|---|---|---|---|
| 1 | Status is Won/Lost/Pending Won/Pending Lost | `null` | No cache |
| 2 | Main cache TTL is `null` | `null` | No cache |
| 3 | `lastUpdated` within **3 days** | 15,000 ms | 15 seconds |
| 4 | Everything else | 1,800,000 ms | 30 minutes |
Products on terminal-status opportunities are never proactively cached. Non-hot products use a **lazy on-demand** cache — they're fetched when requested and cached for 30 minutes.
### Site TTL
Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely changes. Sites are **not** proactively warmed by the background refresh — they are populated lazily on the first detail-view request.
---
## Background Refresh
**Function:** `refreshOpportunityCache()` in `src/modules/cache/opportunityCache.ts`
**Interval:** Every 30 seconds, triggered from `src/index.ts`.
### Refresh cycle
1. **Query DB** — fetch all non-closed opportunities + recently closed (within 30 days), ordered by `cwLastUpdated DESC` (most recently active first).
2. **Batch EXISTS check** — use a single Redis pipeline to check which cache keys already exist (5 EXISTS commands per opportunity: oppCwData, activities, notes, contacts, products).
3. **Build thunk list** — for each opportunity with missing keys, push a **thunk** (lazy function) into the task list. No HTTP requests fire at this point.
4. **Execute with bounded concurrency** — process thunks in batches of `CONCURRENCY` (currently **6**), with a `BATCH_DELAY_MS` (currently **250ms**) pause between batches. Each thunk is only invoked inside the batch loop.
5. **Emit events**`cache:opportunities:refresh:started` and `cache:opportunities:refresh:completed` events are emitted for the event debugger.
### Concurrency control
The thunk pattern is critical. Previously, tasks were pushed as already-executing promises (`refreshTasks.push(fetchAndCache(...))`), which meant all HTTP requests fired simultaneously regardless of the batching loop. The fix was changing the array type from `Promise<void>[]` to `(() => Promise<void>)[]` so requests only start when explicitly invoked: `batch.map((fn) => fn())`.
### Current tuning
| Parameter | Value | Effect |
|---|---|---|
| `CONCURRENCY` | 6 | Max simultaneous CW API requests per batch |
| `BATCH_DELAY_MS` | 250 | Milliseconds between batches |
| Refresh interval | 30 seconds | How often the full sweep runs |
At these settings, a full sweep of ~500 expired keys completes in ~1-2 minutes with zero CW errors and ~230ms median latency.
---
## Retry Logic (`withCwRetry`)
**File:** `src/modules/cw-utils/withCwRetry.ts`
Wraps CW API calls with exponential backoff retry on transient errors.
### Retryable errors
- `ECONNABORTED` (timeout)
- `ECONNRESET`
- `ETIMEDOUT`
- `ECONNREFUSED`
- `ERR_NETWORK`
- `ENETUNREACH`
- HTTP 5xx server errors
### Default configuration
| Parameter | Default | Description |
|---|---|---|
| `maxAttempts` | 3 | Total attempts including the first |
| `baseDelayMs` | 1,000 | Delay before first retry (doubles each retry: 1s → 2s → 4s) |
| `label` | — | Optional tag for log messages |
### Usage
```ts
import { withCwRetry } from "./withCwRetry";
const response = await withCwRetry(
() => connectWiseApi.get(`/company/companies/${id}`),
{ label: `fetchCompany#${id}`, maxAttempts: 3, baseDelayMs: 1_500 },
);
```
Non-transient errors (404, 400, etc.) are re-thrown immediately without retry.
---
## CW API Logger
**File:** `src/modules/cw-utils/cwApiLogger.ts`
Axios interceptor that logs every CW API call to a JSONL file. Logging is **opt-in** — set the `LOG_CW_API` environment variable to enable it. Each process start creates a new timestamped file in the `cw-api-logs/` directory (e.g., `cw-api-logs/2026-03-02T14-30-05.123Z.jsonl`).
### Enabling logging
```bash
# Via the dev:log shorthand script
bun run dev:log
# Or manually with any command
LOG_CW_API=1 bun run dev
```
### Log entry fields
| Field | Type | Description |
|---|---|---|
| `timestamp` | string (ISO-8601) | When the request completed |
| `method` | string | HTTP method |
| `url` | string | Request URL (relative or absolute) |
| `baseURL` | string | Axios baseURL |
| `status` | number \| null | HTTP status (null on network error) |
| `durationMs` | number | Wall-clock time in milliseconds |
| `error` | string \| null | Error code + message, if any |
| `timeout` | number | Configured timeout in ms |
### Analysis
Run the analyzer script to analyze the most recent log file:
```bash
bun run utils:analyze_cw
```
Or specify a particular file:
```bash
python3 debug-scripts/analyze-cw-calls.py cw-api-logs/2026-03-02T14-30-05.123Z.jsonl
```
This executes `debug-scripts/analyze-cw-calls.py` which produces:
- Overview (total calls, error rate, time span)
- Duration statistics (min, max, mean, p50, p90, p95, p99, distribution histogram)
- Error breakdown by type and endpoint
- Top 20 slowest calls
- Per-endpoint stats (count, errors, mean, p50, p95, max, total time)
- Timeline (per-minute throughput and errors)
- Concurrency hotspot detection
- Summary with recommendations
To clear all logs:
```bash
rm -rf cw-api-logs/
```
---
## Cache Invalidation
Mutation endpoints invalidate the relevant cache keys so the next read fetches fresh data from CW:
| Mutation | Cache invalidated |
|---|---|
| Create/update/delete note | `opp:notes:{cwOpportunityId}` via `invalidateNotesCache()` |
| Create/update/delete contact | `opp:contacts:{cwOpportunityId}` via `invalidateContactsCache()` |
| Add/update/resequence products | `opp:products:{cwOpportunityId}` via `invalidateProductsCache()` |
| Refresh opportunity | All keys for that opportunity (via re-fetch) |
---
## ConnectWise API Configuration
The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts`:
| Setting | Value | Purpose |
|---|---|---|
| `baseURL` | `https://ttscw.totaltech.net/v4_6_release/apis/3.0/` | CW API base |
| `timeout` | 30,000 ms (30s) | Per-request timeout |
| Logger | `attachCwApiLogger()` | Writes to `cw-api-calls.jsonl` |
---
## Architecture diagram
```
src/index.ts
├─ setInterval(refreshOpportunityCache, 30s)
└─► src/modules/cache/opportunityCache.ts
├─ prisma.opportunity.findMany(orderBy: cwLastUpdated DESC)
├─ redis.pipeline().exists(...) ← batch key check
├─ Build thunk list (lazy functions)
└─ Execute thunks with CONCURRENCY=6, DELAY=250ms
├─► fetchAndCacheOppCwData() ─► opportunityCw.fetch()
├─► fetchAndCacheActivities() ─► activityCw.fetchByOpportunityDirect()
├─► fetchAndCacheNotes() ─► opportunityCw.fetchNotes()
├─► fetchAndCacheContacts() ─► opportunityCw.fetchContacts()
├─► fetchAndCacheProducts() ─► opportunityCw.fetchProducts() + fetchProcurementProducts()
├─► fetchAndCacheCompanyCwData() ─► fetchCwCompanyById() + contacts
└─► fetchAndCacheSite() ─► fetchCompanySite() (lazy only)
└─► connectWiseApi.get(...) ← withCwRetry + cwApiLogger interceptors
└─► Redis SET with computed TTL
```
---
## File reference
| File | Purpose |
|---|---|
| `src/modules/cache/opportunityCache.ts` | Cache read/write helpers, background refresh logic |
| `src/modules/algorithms/computeCacheTTL.ts` | Primary adaptive TTL algorithm |
| `src/modules/algorithms/computeSubResourceCacheTTL.ts` | Sub-resource (notes, contacts) TTL algorithm |
| `src/modules/algorithms/computeProductsCacheTTL.ts` | Products TTL algorithm |
| `src/modules/cw-utils/withCwRetry.ts` | Retry wrapper with exponential backoff |
| `src/modules/cw-utils/cwApiLogger.ts` | Axios interceptor for JSONL call logging |
| `src/modules/cw-utils/fetchCompany.ts` | Company fetch with retry |
| `src/constants.ts` | CW Axios instance config (timeout, logger) |
| `src/index.ts` | Refresh interval registration |
| `debug-scripts/analyze-cw-calls.py` | CW API call analysis script |
+307
View File
@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""
Analyze ConnectWise API call logs.
Looks for the most recent log file in cw-api-logs/ by default,
or accepts an explicit path as an argument.
Usage:
python3 analyze-cw-calls.py # latest file in cw-api-logs/
python3 analyze-cw-calls.py cw-api-logs/specific.jsonl
"""
import json
import sys
import os
import glob
import statistics
from collections import Counter, defaultdict
from datetime import datetime, timedelta
# ── Colours ──────────────────────────────────────────────────────────────────
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
def colour_duration(ms: float) -> str:
if ms >= 10_000:
return f"{RED}{ms:,.0f}ms{RESET}"
if ms >= 5_000:
return f"{YELLOW}{ms:,.0f}ms{RESET}"
return f"{GREEN}{ms:,.0f}ms{RESET}"
def header(title: str) -> str:
return f"\n{BOLD}{CYAN}{'' * 60}\n {title}\n{'' * 60}{RESET}"
# ── Resolve log file ────────────────────────────────────────────────────────
def find_latest_log() -> str:
"""Find the most recent .jsonl file in cw-api-logs/."""
log_dir = os.path.join(os.getcwd(), "cw-api-logs")
files = sorted(glob.glob(os.path.join(log_dir, "*.jsonl")))
if not files:
print(f"{RED}No log files found in cw-api-logs/{RESET}")
print(f"Run {BOLD}bun run dev:log{RESET} to start logging CW API calls.")
sys.exit(1)
return files[-1]
if len(sys.argv) > 1:
log_path = sys.argv[1]
else:
log_path = find_latest_log()
print(f"{DIM}Reading: {log_path}{RESET}")
entries = []
parse_errors = 0
with open(log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
parse_errors += 1
if not entries:
print("No entries found. Check the log file path.")
sys.exit(1)
# ── Derived fields ───────────────────────────────────────────────────────────
durations = [e["durationMs"] for e in entries]
errors = [e for e in entries if e.get("error")]
successes = [e for e in entries if not e.get("error")]
timestamps = [datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) for e in entries]
time_span = (timestamps[-1] - timestamps[0]) if len(timestamps) > 1 else timedelta(0)
# Normalise the URL to a route pattern for grouping
def normalise_url(url: str) -> str:
parts = url.split("?")[0].rstrip("/").split("/")
normalised = []
for p in parts:
if p.isdigit():
normalised.append(":id")
else:
normalised.append(p)
return "/".join(normalised)
# ── 1. Overview ──────────────────────────────────────────────────────────────
print(header("OVERVIEW"))
print(f" Log file : {log_path}")
print(f" Total calls : {BOLD}{len(entries):,}{RESET}")
print(f" Successes : {GREEN}{len(successes):,}{RESET}")
print(f" Failures : {RED}{len(errors):,}{RESET} ({len(errors)/len(entries)*100:.1f}%)")
print(f" Time span : {time_span}")
if time_span.total_seconds() > 0:
rps = len(entries) / time_span.total_seconds()
print(f" Avg req/sec : {rps:.2f}")
if parse_errors:
print(f" Parse errors : {YELLOW}{parse_errors}{RESET}")
# ── 2. Duration stats ───────────────────────────────────────────────────────
print(header("DURATION STATS (all calls)"))
sorted_dur = sorted(durations)
p50 = sorted_dur[len(sorted_dur) * 50 // 100]
p90 = sorted_dur[len(sorted_dur) * 90 // 100]
p95 = sorted_dur[len(sorted_dur) * 95 // 100]
p99 = sorted_dur[len(sorted_dur) * 99 // 100]
print(f" Min : {colour_duration(min(durations))}")
print(f" Max : {colour_duration(max(durations))}")
print(f" Mean : {colour_duration(statistics.mean(durations))}")
print(f" Median (p50) : {colour_duration(p50)}")
print(f" p90 : {colour_duration(p90)}")
print(f" p95 : {colour_duration(p95)}")
print(f" p99 : {colour_duration(p99)}")
print(f" Std dev : {statistics.stdev(durations):,.0f}ms" if len(durations) > 1 else "")
# Duration buckets
buckets = {"<500ms": 0, "500ms-1s": 0, "1-3s": 0, "3-5s": 0, "5-10s": 0, "10-20s": 0, "20s+": 0}
for d in durations:
if d < 500: buckets["<500ms"] += 1
elif d < 1000: buckets["500ms-1s"] += 1
elif d < 3000: buckets["1-3s"] += 1
elif d < 5000: buckets["3-5s"] += 1
elif d < 10000: buckets["5-10s"] += 1
elif d < 20000: buckets["10-20s"] += 1
else: buckets["20s+"] += 1
print(f"\n {BOLD}Distribution:{RESET}")
max_bar = 40
max_count = max(buckets.values()) if buckets else 1
for label, count in buckets.items():
bar_len = int(count / max_count * max_bar) if max_count else 0
pct = count / len(durations) * 100
bar = "" * bar_len
clr = GREEN if "500" in label or "<" in label else (YELLOW if "1-3" in label or "3-5" in label else RED)
print(f" {label:>10s} {clr}{bar}{RESET} {count:>5,} ({pct:5.1f}%)")
# ── 3. Errors breakdown ─────────────────────────────────────────────────────
print(header("ERROR BREAKDOWN"))
if not errors:
print(f" {GREEN}No errors! 🎉{RESET}")
else:
error_codes = Counter()
for e in errors:
err_str = e.get("error", "unknown")
code = err_str.split(":")[0] if ":" in err_str else err_str
error_codes[code] += 1
for code, count in error_codes.most_common():
print(f" {RED}{code:<30s}{RESET} {count:>5,} ({count/len(entries)*100:.1f}%)")
# Errored URLs
errored_urls = Counter(normalise_url(e["url"]) for e in errors)
print(f"\n {BOLD}Top errored endpoints:{RESET}")
for url, count in errored_urls.most_common(10):
print(f" {count:>5,} {url}")
# ── 4. Slowest individual calls ─────────────────────────────────────────────
print(header("TOP 20 SLOWEST CALLS"))
slowest = sorted(entries, key=lambda e: e["durationMs"], reverse=True)[:20]
for i, e in enumerate(slowest, 1):
status = e.get("status") or f"{RED}ERR{RESET}"
err_tag = f" {RED}[{e['error'].split(':')[0]}]{RESET}" if e.get("error") else ""
print(f" {i:>2}. {colour_duration(e['durationMs']):>20s} {e['method']:>4s} {e['url'][:60]:<60s} {DIM}{status}{RESET}{err_tag}")
# ── 5. Per-endpoint stats ───────────────────────────────────────────────────
print(header("PER-ENDPOINT STATS (by route pattern)"))
by_route = defaultdict(list)
for e in entries:
route = normalise_url(e["url"])
by_route[route].append(e)
# Sort by total time spent descending (most impactful)
route_stats = []
for route, calls in by_route.items():
durs = [c["durationMs"] for c in calls]
errs = sum(1 for c in calls if c.get("error"))
sorted_d = sorted(durs)
route_stats.append({
"route": route,
"count": len(calls),
"errors": errs,
"total_ms": sum(durs),
"mean": statistics.mean(durs),
"p50": sorted_d[len(sorted_d) * 50 // 100],
"p95": sorted_d[len(sorted_d) * 95 // 100],
"max": max(durs),
})
route_stats.sort(key=lambda r: r["total_ms"], reverse=True)
print(f" {'Route':<55s} {'Count':>6s} {'Errs':>5s} {'Mean':>8s} {'p50':>8s} {'p95':>8s} {'Max':>8s} {'Total':>10s}")
print(f" {'' * 55} {'' * 6} {'' * 5} {'' * 8} {'' * 8} {'' * 8} {'' * 8} {'' * 10}")
for r in route_stats[:25]:
err_str = f"{RED}{r['errors']}{RESET}" if r['errors'] else f"{DIM}0{RESET}"
print(
f" {r['route']:<55s} {r['count']:>6,} {err_str:>14s} "
f"{r['mean']:>7,.0f}ms {r['p50']:>7,.0f}ms {r['p95']:>7,.0f}ms "
f"{r['max']:>7,.0f}ms {r['total_ms']/1000:>8,.1f}s"
)
# ── 6. HTTP method breakdown ────────────────────────────────────────────────
print(header("BY HTTP METHOD"))
by_method = defaultdict(list)
for e in entries:
by_method[e["method"]].append(e["durationMs"])
print(f" {'Method':<8s} {'Count':>7s} {'Mean':>9s} {'p95':>9s} {'Max':>9s}")
print(f" {'' * 8} {'' * 7} {'' * 9} {'' * 9} {'' * 9}")
for method in sorted(by_method.keys()):
durs = by_method[method]
sd = sorted(durs)
print(
f" {method:<8s} {len(durs):>7,} "
f"{statistics.mean(durs):>8,.0f}ms "
f"{sd[len(sd)*95//100]:>8,.0f}ms "
f"{max(durs):>8,.0f}ms"
)
# ── 7. Timeline (calls per minute) ──────────────────────────────────────────
if time_span.total_seconds() > 60:
print(header("TIMELINE (per-minute throughput & errors)"))
by_minute = defaultdict(lambda: {"count": 0, "errors": 0, "dur_sum": 0})
for e in entries:
ts = e["timestamp"][:16] # YYYY-MM-DDTHH:MM
by_minute[ts]["count"] += 1
by_minute[ts]["dur_sum"] += e["durationMs"]
if e.get("error"):
by_minute[ts]["errors"] += 1
for minute in sorted(by_minute.keys()):
m = by_minute[minute]
avg = m["dur_sum"] / m["count"] if m["count"] else 0
err_part = f" {RED}({m['errors']} errs){RESET}" if m["errors"] else ""
bar = "" * min(m["count"] // 5, 50)
avg_clr = colour_duration(avg)
print(f" {minute} {m['count']:>5,} reqs avg {avg_clr:>20s} {bar}{err_part}")
# ── 8. Concurrency hotspots ─────────────────────────────────────────────────
print(header("CONCURRENCY HOTSPOTS (calls starting within 100ms of each other)"))
ts_ms = [int(t.timestamp() * 1000) for t in timestamps]
bursts = []
i = 0
while i < len(ts_ms):
j = i
while j < len(ts_ms) - 1 and ts_ms[j + 1] - ts_ms[j] < 100:
j += 1
burst_size = j - i + 1
if burst_size >= 5:
burst_entries = entries[i:j + 1]
avg_dur = statistics.mean(e["durationMs"] for e in burst_entries)
bursts.append((burst_size, entries[i]["timestamp"], avg_dur, burst_entries))
i = j + 1
bursts.sort(key=lambda b: b[0], reverse=True)
if bursts:
print(f" Found {len(bursts)} burst(s) of ≥5 concurrent requests\n")
for size, ts, avg, _ in bursts[:10]:
print(f" {YELLOW}{size:>3} concurrent{RESET} at {ts} avg {colour_duration(avg)}")
else:
print(f" {GREEN}No major concurrency bursts detected.{RESET}")
# ── 9. Summary / recommendations ────────────────────────────────────────────
print(header("SUMMARY"))
err_rate = len(errors) / len(entries) * 100
slow_5s = sum(1 for d in durations if d >= 5000)
slow_pct = slow_5s / len(entries) * 100
if err_rate > 5:
print(f" {RED}⚠ Error rate is {err_rate:.1f}% — CW API is struggling{RESET}")
elif err_rate > 1:
print(f" {YELLOW}⚠ Error rate is {err_rate:.1f}% — some instability{RESET}")
else:
print(f" {GREEN}✓ Error rate is {err_rate:.1f}% — acceptable{RESET}")
if slow_pct > 10:
print(f" {RED}{slow_5s:,} calls ({slow_pct:.1f}%) took >5s — CW is slow or rate-limiting{RESET}")
elif slow_pct > 2:
print(f" {YELLOW}{slow_5s:,} calls ({slow_pct:.1f}%) took >5s{RESET}")
else:
print(f" {GREEN}✓ Only {slow_5s:,} calls ({slow_pct:.1f}%) over 5s{RESET}")
if bursts:
max_burst = max(b[0] for b in bursts)
print(f" {YELLOW}⚠ Max concurrency burst: {max_burst} simultaneous requests — consider lowering CONCURRENCY{RESET}")
total_time_s = sum(durations) / 1000
print(f"\n Total wall-clock time spent waiting on CW: {BOLD}{total_time_s:,.1f}s{RESET} ({total_time_s/60:,.1f} min)")
print()
+2
View File
@@ -19,6 +19,7 @@
},
"scripts": {
"dev": "NODE_ENV=development bun --watch src/index.ts",
"dev:log": "LOG_CW_API=1 NODE_ENV=development bun --watch src/index.ts",
"test": "bun test --preload ./tests/setup.ts",
"db:gen": "prisma generate",
"db:push": "prisma migrate dev --skip-generate",
@@ -27,6 +28,7 @@
"utils:gen_private_keys": "bun ./utils/genPrivateKeys",
"utils:create_admin_role": "bun ./utils/createAdminRole",
"utils:assign_user_role": "bun ./utils/assignUserRole",
"utils:analyze_cw": "python3 debug-scripts/analyze-cw-calls.py",
"db:check": "bunx prisma migrate diff --from-migrations prisma/migrations --to-schema prisma/schema.prisma --shadow-database-url $DATABASE_URL --exit-code"
},
"dependencies": {
+1 -1
View File
@@ -53,7 +53,7 @@ export default createRoute(
),
);
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const created = await item.addProducts(gatedItems);
const isBatch = Array.isArray(body);
+1 -1
View File
@@ -10,7 +10,7 @@ export default createRoute(
["/opportunities/:identifier/contacts"],
async (c) => {
const identifier = c.req.param("identifier");
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const data = await item.fetchContacts();
+1 -1
View File
@@ -21,7 +21,7 @@ export default createRoute(
const data = schema.parse(body);
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const user = c.get("user");
const created = await item.addNote(data.text, user.login, {
+1 -1
View File
@@ -20,7 +20,7 @@ export default createRoute(
message: "Note ID must be a number",
});
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
await item.deleteNote(noteId);
const response = apiResponse.successful(
+32 -1
View File
@@ -5,12 +5,19 @@ import { ContentfulStatusCode } from "hono/utils/http-status";
import { authMiddleware } from "../../middleware/authorization";
import { processObjectValuePerms } from "../../../modules/permission-utils/processObjectPermissions";
/* GET /v1/sales/opportunities/:identifier */
/* GET /v1/sales/opportunities/:identifier?include=notes,contacts,products */
export default createRoute(
"get",
["/opportunities/:identifier"],
async (c) => {
const identifier = c.req.param("identifier");
const includeParam = c.req.query("include") ?? "";
const includes = new Set(
includeParam
.split(",")
.map((s) => s.trim().toLowerCase())
.filter(Boolean),
);
const item = await opportunities.fetchItem(identifier);
@@ -23,6 +30,30 @@ export default createRoute(
c.get("user"),
);
// Fetch requested sub-resources in parallel
const subResourcePromises: Record<string, Promise<any>> = {};
if (includes.has("notes")) {
subResourcePromises.notes = item.fetchNotes();
}
if (includes.has("contacts")) {
subResourcePromises.contacts = item.fetchContacts();
}
if (includes.has("products")) {
subResourcePromises.products = item
.fetchProducts()
.then((products) => products.map((p) => p.toJson()));
}
const keys = Object.keys(subResourcePromises);
if (keys.length > 0) {
const results = await Promise.all(
keys.map((k) => subResourcePromises[k]),
);
keys.forEach((k, i) => {
(gatedData as any)[k] = results[i];
});
}
const response = apiResponse.successful(
"Opportunity fetched successfully!",
gatedData,
+1 -1
View File
@@ -20,7 +20,7 @@ export default createRoute(
message: "Note ID must be a number",
});
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const data = await item.fetchNote(noteId);
const response = apiResponse.successful(
+1 -1
View File
@@ -10,7 +10,7 @@ export default createRoute(
["/opportunities/:identifier/notes"],
async (c) => {
const identifier = c.req.param("identifier");
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const data = await item.fetchNotes();
+1 -1
View File
@@ -10,7 +10,7 @@ export default createRoute(
["/opportunities/:identifier/products"],
async (c) => {
const identifier = c.req.param("identifier");
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const data = await item.fetchProducts();
+1 -1
View File
@@ -21,7 +21,7 @@ export default createRoute(
const { orderedIds } = schema.parse(body);
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const updated = await item.resequenceProducts(orderedIds);
const response = apiResponse.successful(
+1 -1
View File
@@ -35,7 +35,7 @@ export default createRoute(
const data = schema.parse(body);
const item = await opportunities.fetchItem(identifier);
const item = await opportunities.fetchRecord(identifier);
const updated = await item.updateNote(noteId, data);
const response = apiResponse.successful(
+4
View File
@@ -6,6 +6,7 @@ import { Server } from "socket.io";
import { Server as Engine } from "@socket.io/bun-engine";
import axios from "axios";
import { UnifiClient } from "./modules/unifi-api/UnifiClient";
import { attachCwApiLogger } from "./modules/cw-utils/cwApiLogger";
import Redis from "ioredis";
const connectionString = `${process.env.DATABASE_URL}`;
@@ -81,8 +82,11 @@ const connectWiseApi = axios.create({
clientId: `${process.env.CW_CLIENT_ID}`,
"Content-Type": "application/json",
},
timeout: 30_000, // 30 s — prevents indefinite hangs on CW API
});
attachCwApiLogger(connectWiseApi);
export { connectWiseApi };
// Unifi API Constants
+178 -31
View File
@@ -18,6 +18,20 @@ import {
import { resolveMember } from "../modules/cw-utils/members/memberCache";
import { ForecastProductController } from "./ForecastProductController";
import GenericError from "../Errors/GenericError";
import { computeSubResourceCacheTTL } from "../modules/algorithms/computeSubResourceCacheTTL";
import { computeProductsCacheTTL } from "../modules/algorithms/computeProductsCacheTTL";
import {
getCachedNotes,
getCachedContacts,
getCachedProducts,
getCachedSite,
fetchAndCacheNotes,
fetchAndCacheContacts,
fetchAndCacheProducts,
fetchAndCacheSite,
invalidateNotesCache,
invalidateProductsCache,
} from "../modules/cache/opportunityCache";
/**
* Opportunity Controller
@@ -91,6 +105,27 @@ export class OpportunityController {
private _customFields: CWCustomField[] | null = null;
private _activities: ActivityController[] | null = null;
/** Compute the sub-resource cache TTL from this opportunity's fields. */
private _subResourceTTL(): number | null {
return computeSubResourceCacheTTL({
closedFlag: this.closedFlag,
closedDate: this.closedDate,
expectedCloseDate: this.expectedCloseDate,
lastUpdated: this.cwLastUpdated,
});
}
/** Compute the products-specific cache TTL from this opportunity's fields. */
private _productsTTL(): number | null {
return computeProductsCacheTTL({
closedFlag: this.closedFlag,
closedDate: this.closedDate,
expectedCloseDate: this.expectedCloseDate,
lastUpdated: this.cwLastUpdated,
statusCwId: this.statusCwId,
});
}
constructor(
data: Opportunity & { company?: Company | null },
opts?: {
@@ -288,6 +323,7 @@ export class OpportunityController {
*
* Fetches the full site details (address, phone, flags) from ConnectWise
* for the site associated with this opportunity.
* Checks the Redis cache first (30-min TTL); on miss, calls CW and caches.
* Requires both companyCwId and siteCwId to be set.
*
* @returns Serialized site object or null
@@ -296,7 +332,17 @@ export class OpportunityController {
if (this._siteData) return this._siteData;
if (!this.companyCwId || !this.siteCwId) return null;
const cwSite = await fetchCompanySite(this.companyCwId, this.siteCwId);
// Try cache first
const cached = await getCachedSite(this.companyCwId, this.siteCwId);
if (cached) {
this._siteData = serializeCwSite(cached);
return this._siteData;
}
// Cache miss — fetch from CW and cache
const cwSite = await fetchAndCacheSite(this.companyCwId, this.siteCwId);
if (!cwSite) return null;
this._siteData = serializeCwSite(cwSite);
return this._siteData;
}
@@ -304,13 +350,37 @@ export class OpportunityController {
/**
* Fetch Contacts
*
* Fetches contacts associated with this opportunity from ConnectWise
* and returns a serialized array.
* Fetches contacts associated with this opportunity. Checks the Redis
* cache first; on miss, calls ConnectWise and caches the raw response.
*
* @param opts.fresh - Bypass cache and fetch directly from CW.
*/
public async fetchContacts() {
const contacts = await opportunityCw.fetchContacts(this.cwOpportunityId);
public async fetchContacts(opts?: { fresh?: boolean }) {
const ttl = this._subResourceTTL();
return contacts.map((ct) => ({
// Try cache first (unless forced fresh)
if (!opts?.fresh && ttl !== null) {
const cached = await getCachedContacts(this.cwOpportunityId);
if (cached) return this._serializeContacts(cached);
}
// Fetch from CW (fetchAndCache* handles 404 internally)
try {
const contacts =
ttl !== null
? await fetchAndCacheContacts(this.cwOpportunityId, ttl)
: await opportunityCw.fetchContacts(this.cwOpportunityId);
return this._serializeContacts(contacts);
} catch (err: any) {
if (err?.isAxiosError && err?.response?.status === 404) return [];
throw err;
}
}
/** Serialize raw CW contact data into the API response shape. */
private _serializeContacts(contacts: any[]) {
return contacts.map((ct: any) => ({
id: ct.id,
contact: ct.contact ? { id: ct.contact.id, name: ct.contact.name } : null,
company: ct.company
@@ -329,14 +399,38 @@ export class OpportunityController {
/**
* Fetch Notes
*
* Fetches notes associated with this opportunity from ConnectWise
* and returns a serialized array.
* Fetches notes associated with this opportunity. Checks the Redis
* cache first; on miss, calls ConnectWise and caches the raw response.
*
* @param opts.fresh - Bypass cache and fetch directly from CW.
*/
public async fetchNotes() {
const notes = await opportunityCw.fetchNotes(this.cwOpportunityId);
public async fetchNotes(opts?: { fresh?: boolean }) {
const ttl = this._subResourceTTL();
// Try cache first (unless forced fresh)
if (!opts?.fresh && ttl !== null) {
const cached = await getCachedNotes(this.cwOpportunityId);
if (cached) return this._serializeNotes(cached);
}
// Fetch from CW (fetchAndCache* handles 404 internally)
try {
const notes =
ttl !== null
? await fetchAndCacheNotes(this.cwOpportunityId, ttl)
: await opportunityCw.fetchNotes(this.cwOpportunityId);
return this._serializeNotes(notes);
} catch (err: any) {
if (err?.isAxiosError && err?.response?.status === 404) return [];
throw err;
}
}
/** Serialize raw CW note data into the API response shape. */
private async _serializeNotes(notes: any[]) {
return Promise.all(
notes.map(async (n) => ({
notes.map(async (n: any) => ({
id: n.id,
text: n.text,
type: n.type ? { id: n.type.id, name: n.type.name } : null,
@@ -388,15 +482,58 @@ export class OpportunityController {
/**
* Fetch Products
*
* Fetches products (forecast/revenue items) for this opportunity from
* ConnectWise and returns ForecastProductController instances.
* Fetches products (forecast/revenue items) for this opportunity.
* Checks the Redis cache first; on miss, calls ConnectWise and
* caches the raw response using the products-specific TTL algorithm.
*
* @param opts.fresh - Bypass cache and fetch directly from CW.
*/
public async fetchProducts(): Promise<ForecastProductController[]> {
const [forecast, procProducts] = await Promise.all([
opportunityCw.fetchProducts(this.cwOpportunityId),
opportunityCw.fetchProcurementProducts(this.cwOpportunityId),
]);
public async fetchProducts(opts?: {
fresh?: boolean;
}): Promise<ForecastProductController[]> {
const ttl = this._productsTTL();
let forecast: any;
let procProducts: any[];
// Try cache first (unless forced fresh)
if (!opts?.fresh && ttl !== null) {
const cached = await getCachedProducts(this.cwOpportunityId);
if (cached) {
forecast = cached.forecast;
procProducts = cached.procProducts;
} else {
// Cache miss — fetch from CW and cache
const blob = await fetchAndCacheProducts(this.cwOpportunityId, ttl);
forecast = blob.forecast;
procProducts = blob.procProducts;
}
} else {
// No caching (won/lost/pending or forced fresh) — fetch directly
try {
[forecast, procProducts] = await Promise.all([
opportunityCw.fetchProducts(this.cwOpportunityId),
opportunityCw.fetchProcurementProducts(this.cwOpportunityId),
]);
} catch (err: any) {
if (err?.isAxiosError && err?.response?.status === 404) return [];
throw err;
}
}
return this._buildProductControllers(forecast, procProducts);
}
/**
* Build ForecastProductController[] from raw CW data.
*
* Extracted from fetchProducts() so both cached and fresh paths
* share the same ordering + enrichment logic.
*/
private async _buildProductControllers(
forecast: any,
procProducts: any[],
): Promise<ForecastProductController[]> {
// Build a map of forecastDetailId → procurement product cancellation data
const cancellationMap = new Map<number, Record<string, unknown>>();
for (const pp of procProducts) {
@@ -412,30 +549,32 @@ export class OpportunityController {
let ordered: typeof forecastItems;
if (this.productSequence.length > 0) {
const itemById = new Map(forecastItems.map((fi) => [fi.id, fi]));
const itemById = new Map(forecastItems.map((fi: any) => [fi.id, fi]));
// Items in the specified order first, then any new items not yet sequenced
const sequenced = this.productSequence
.map((id) => itemById.get(id))
.filter((fi): fi is NonNullable<typeof fi> => fi !== undefined);
.filter((fi: any): fi is NonNullable<typeof fi> => fi !== undefined);
const sequencedIds = new Set(this.productSequence);
const unsequenced = forecastItems
.filter((fi) => !sequencedIds.has(fi.id))
.sort((a, b) => a.sequenceNumber - b.sequenceNumber);
.filter((fi: any) => !sequencedIds.has(fi.id))
.sort((a: any, b: any) => a.sequenceNumber - b.sequenceNumber);
ordered = [...sequenced, ...unsequenced];
} else {
ordered = [...forecastItems].sort(
(a, b) => a.sequenceNumber - b.sequenceNumber,
(a: any, b: any) => a.sequenceNumber - b.sequenceNumber,
);
}
const controllers = ordered.map((item) => {
const ctrl = new ForecastProductController(item);
const procData = cancellationMap.get(item.id);
if (procData) {
ctrl.applyCancellationData(procData as any);
}
return ctrl;
});
const controllers: ForecastProductController[] = ordered.map(
(item: any) => {
const ctrl = new ForecastProductController(item);
const procData = cancellationMap.get(item.id);
if (procData) {
ctrl.applyCancellationData(procData as any);
}
return ctrl;
},
);
// Enrich with internal inventory data from local CatalogItem DB
const catalogCwIds = controllers
@@ -559,6 +698,7 @@ export class OpportunityController {
forecastItemId,
data,
);
await invalidateProductsCache(this.cwOpportunityId);
return new ForecastProductController(updated);
} catch (err: any) {
console.error(
@@ -613,6 +753,9 @@ export class OpportunityController {
});
this.productSequence = orderedIds;
// Invalidate cached products since ordering changed
await invalidateProductsCache(this.cwOpportunityId);
// Return items in the new order
return this.fetchProducts();
}
@@ -635,6 +778,7 @@ export class OpportunityController {
this.cwOpportunityId,
data,
);
await invalidateProductsCache(this.cwOpportunityId);
return created.map((item) => new ForecastProductController(item));
} catch (err: any) {
console.error(
@@ -680,6 +824,7 @@ export class OpportunityController {
text: note,
flagged: opts?.flagged ?? false,
});
await invalidateNotesCache(this.cwOpportunityId);
return created;
}
@@ -700,6 +845,7 @@ export class OpportunityController {
noteId,
data,
);
await invalidateNotesCache(this.cwOpportunityId);
return updated;
}
@@ -712,6 +858,7 @@ export class OpportunityController {
*/
public async deleteNote(noteId: number): Promise<void> {
await opportunityCw.deleteNote(this.cwOpportunityId, noteId);
await invalidateNotesCache(this.cwOpportunityId);
}
/**
+17 -1
View File
@@ -33,6 +33,9 @@ export class RoleController {
private _permissionsToken: string;
private _users: (User & { roles: Role[] })[];
/** Cached result of JWT verification — avoids repeated RSA verify calls. */
private _cachedVerifiedPermissions: { permissions: string[] } | null = null;
public readonly createdAt: Date;
public updatedAt: Date;
@@ -62,6 +65,14 @@ export class RoleController {
* @returns - Verified object with permissions in it.
*/
private _verifyPermissions(permissionsToken: string) {
// Return cached result if the token hasn't changed
if (
this._cachedVerifiedPermissions &&
permissionsToken === this._permissionsToken
) {
return this._cachedVerifiedPermissions;
}
let perms: DecodedPermissionsBlock;
try {
perms = jwt.verify(permissionsToken, permissionsPrivateKey, {
@@ -82,7 +93,12 @@ export class RoleController {
);
}
return perms as { permissions: string[] };
const result = perms as { permissions: string[] };
// Cache only if verifying the current token
if (permissionsToken === this._permissionsToken) {
this._cachedVerifiedPermissions = result;
}
return result;
}
/**
+32 -25
View File
@@ -24,6 +24,13 @@ export default class UserController {
private _roles: Collection<string, Role>;
private _permissions: string | null;
/** Cached result of fetchRoles() — populated on first hasPermission call. */
private _resolvedRoleControllers: Collection<string, RoleController> | null =
null;
/** Per-permission result cache — avoids repeated JWT verification + DB lookups. */
private _permissionCache: Map<string, boolean> = new Map();
public createdAt: Date;
public updatedAt: Date;
constructor(userdata: User & { roles: Role[] }) {
@@ -127,6 +134,7 @@ export default class UserController {
this._updateInternalValues(updatedUser);
this._roles = new Collection<string, Role>();
updatedUser.roles.map((v: any) => this._roles.set(v.id, v));
this.clearPermissionCache();
for (const role of resolvedRoles) {
events.emit("user:role:assigned", { user: this, role });
@@ -252,35 +260,34 @@ export default class UserController {
* @returns {boolean} Does this user have the specified permission
*/
public async hasPermission(permission: string) {
let resources = await prisma.user.findFirst({
where: { id: this.id },
select: {
sessions: {
select: { id: true },
},
},
});
// Fast path: return cached result if we already resolved this permission
const cached = this._permissionCache.get(permission);
if (cached !== undefined) return cached;
const resourceKeys: string[] = Object.keys(resources ?? {}) as string[];
// Resolve role controllers once and cache them for the lifetime of this
// controller instance (i.e. the current request).
if (!this._resolvedRoleControllers) {
this._resolvedRoleControllers = await this.fetchRoles();
}
const implicitPermissions = resources
? resourceKeys
// @ts-ignore
.filter((v) => resources[v].length > 0)
.map(
(v) =>
//@ts-ignore
`resource.${v}.[${(resources![v] as { id: string }[])
.map((o) => o.id)
.join(",")}].user.${this.id}.implicit`,
)
: [];
const result = this._resolvedRoleControllers
.map((v) => v.checkPermission(permission))
.includes(true);
let checks = [
(await this.fetchRoles()).map((v) => v.checkPermission(permission)),
].flatMap((v) => v);
this._permissionCache.set(permission, result);
return result;
}
return checks.includes(true);
/**
* Clear Permission Cache
*
* Invalidates the in-memory permission cache so the next
* `hasPermission` call re-fetches roles from the database.
* Call this after role or permission mutations on the user.
*/
public clearPermissionCache() {
this._resolvedRoleControllers = null;
this._permissionCache.clear();
}
/**
+29 -11
View File
@@ -24,6 +24,16 @@ import cuid from "cuid";
// Setup global event debugger in non-production environments
if (Bun.env.NODE_ENV == "development") setupEventDebugger();
/** Concise error message for interval logs — avoids dumping full Axios error objects. */
const briefErr = (err: any): string => {
if (err?.isAxiosError) {
const method = (err.config?.method ?? "?").toUpperCase();
const url = err.config?.url ?? "?";
return `${method} ${url}${err.code ?? `HTTP ${err.response?.status}`}`;
}
return err?.message ?? String(err);
};
// Helper to run a startup sync safely — failures are logged but never crash the process.
const safeStartup = async (label: string, fn: () => Promise<void>) => {
try {
@@ -91,7 +101,7 @@ await safeStartup("ensureAdminRole", async () => {
await safeStartup("refreshCompanies", refreshCompanies);
setInterval(() => {
return refreshCompanies().catch((err) =>
console.error("[interval] refreshCompanies failed", err),
console.error(`[interval] refreshCompanies failed: ${briefErr(err)}`),
);
}, 60 * 1000);
@@ -99,7 +109,7 @@ setInterval(() => {
await safeStartup("refreshCatalog", refreshCatalog);
setInterval(() => {
return refreshCatalog().catch((err) =>
console.error("[interval] refreshCatalog failed", err),
console.error(`[interval] refreshCatalog failed: ${briefErr(err)}`),
);
}, 60 * 1000);
@@ -108,7 +118,7 @@ await safeStartup("refreshInventory", refreshInventory);
setInterval(
() => {
return refreshInventory().catch((err) =>
console.error("[interval] refreshInventory failed", err),
console.error(`[interval] refreshInventory failed: ${briefErr(err)}`),
);
},
2 * 60 * 1000,
@@ -118,16 +128,20 @@ setInterval(
await safeStartup("refreshOpportunities", refreshOpportunities);
setInterval(() => {
return refreshOpportunities().catch((err) =>
console.error("[interval] refreshOpportunities failed", err),
console.error(`[interval] refreshOpportunities failed: ${briefErr(err)}`),
);
}, 60 * 1000);
// Refresh opportunity CW cache every 30 seconds (activities + company hydration)
await safeStartup("refreshOpportunityCache", refreshOpportunityCache);
// NOTE: Do NOT await — register the interval immediately so the cache refresh
// is never blocked by a slow/stuck startup task above.
safeStartup("refreshOpportunityCache", refreshOpportunityCache);
setInterval(() => {
return refreshOpportunityCache().catch((err) =>
console.error("[interval] refreshOpportunityCache failed", err),
);
return refreshOpportunityCache().catch((err) => {
console.error(
`[interval] refreshOpportunityCache failed: ${briefErr(err)}`,
);
});
}, 30 * 1000);
// Refresh User Defined Fields every 5 minutes
@@ -136,7 +150,9 @@ setInterval(
() => {
return userDefinedFieldsCw
.refresh()
.catch((err) => console.error("[interval] refreshUDFs failed", err));
.catch((err) =>
console.error(`[interval] refreshUDFs failed: ${briefErr(err)}`),
);
},
5 * 60 * 1000,
);
@@ -146,7 +162,7 @@ await safeStartup("refreshCwIdentifiers", refreshCwIdentifiers);
setInterval(
() => {
return refreshCwIdentifiers().catch((err) =>
console.error("[interval] refreshCwIdentifiers failed", err),
console.error(`[interval] refreshCwIdentifiers failed: ${briefErr(err)}`),
);
},
30 * 60 * 1000,
@@ -156,5 +172,7 @@ await safeStartup("syncSites", () => unifiSites.syncSites());
setInterval(() => {
return unifiSites
.syncSites()
.catch((err) => console.error("[interval] syncSites failed", err));
.catch((err) =>
console.error(`[interval] syncSites failed: ${briefErr(err)}`),
);
}, 60 * 1000);
+108 -41
View File
@@ -10,8 +10,10 @@ import { computeCacheTTL } from "../modules/algorithms/computeCacheTTL";
import {
getCachedActivities,
getCachedCompanyCwData,
getCachedOppCwData,
fetchAndCacheActivities,
fetchAndCacheCompanyCwData,
fetchAndCacheOppCwData,
} from "../modules/cache/opportunityCache";
// ---------------------------------------------------------------------------
@@ -124,6 +126,48 @@ async function buildActivities(
}
export const opportunities = {
/**
* Fetch Record (lightweight)
*
* Returns an OpportunityController backed only by the **database record**.
* No ConnectWise API calls, no Redis lookups, no activity/company hydration.
*
* Use this when you only need the controller instance to call a sub-resource
* method (e.g. `fetchNotes()`, `fetchContacts()`, `fetchProducts()`,
* `fetchSite()`).
*
* @param identifier - Internal ID (string) or CW opportunity ID (number)
* @returns {Promise<OpportunityController>}
*/
async fetchRecord(
identifier: string | number,
): Promise<OpportunityController> {
const isNumeric =
typeof identifier === "number" || /^\d+$/.test(String(identifier));
const record = await prisma.opportunity.findFirst({
where: isNumeric
? { cwOpportunityId: Number(identifier) }
: { id: identifier as string },
include: { company: true },
});
if (!record) {
throw new GenericError({
message: "Opportunity not found",
name: "OpportunityNotFound",
cause: `No opportunity exists with identifier '${identifier}'`,
status: 404,
});
}
return new OpportunityController(record, {
company: record.company
? new CompanyController(record.company)
: undefined,
});
},
/**
* Fetch Opportunity
*
@@ -132,11 +176,12 @@ export const opportunities = {
*
* **Data-source strategy:**
* - `fresh: true` → `"cw-first"` — always fetches from CW, updates DB, caches result.
* - `fresh: false` (default) → `"cache-then-cw"` — tries Redis first, falls back to CW on miss.
* - `fresh: false` (default) → `"cache-then-cw"` — tries Redis cache for the
* CW opportunity response first, falls back to CW on miss.
*
* Individual fetches always contact CW to update the DB record with
* the latest data from ConnectWise, regardless of the cache strategy
* for activities/company hydration.
* The CW opportunity response is cached in Redis with the same TTL as
* activities/company. The background refresh keeps this warm so most
* detail-view loads skip the CW roundtrip entirely.
*
* @param identifier - The internal ID (string) or CW opportunity ID (number)
* @param opts - Optional flags
@@ -153,12 +198,12 @@ export const opportunities = {
const isNumeric =
typeof identifier === "number" || /^\d+$/.test(String(identifier));
// Look up the existing DB record to get the cwOpportunityId
// Look up the existing DB record (full, with company)
const existing = await prisma.opportunity.findFirst({
where: isNumeric
? { cwOpportunityId: Number(identifier) }
: { id: identifier as string },
select: { id: true, cwOpportunityId: true },
include: { company: true },
});
if (!existing) {
@@ -170,46 +215,68 @@ export const opportunities = {
});
}
// Fetch fresh data from ConnectWise
const cwData = await opportunityCw.fetch(existing.cwOpportunityId);
// Map and update the DB record
const mapped = OpportunityController.mapCwToDb(cwData);
// Resolve internal company link
const companyId = cwData.company?.id
? ((
await prisma.company.findFirst({
where: { cw_CompanyId: cwData.company.id },
select: { id: true },
})
)?.id ?? null)
: null;
const updated = await prisma.opportunity.update({
where: { id: existing.id },
data: { ...mapped, companyId },
include: { company: true },
});
// Compute TTL from the current DB state (used for cache and hydration)
const ttlMs =
computeCacheTTL({
closedFlag: updated.closedFlag,
closedDate: updated.closedDate,
expectedCloseDate: updated.expectedCloseDate,
lastUpdated: updated.cwLastUpdated,
closedFlag: existing.closedFlag,
closedDate: existing.closedDate,
expectedCloseDate: existing.expectedCloseDate,
lastUpdated: existing.cwLastUpdated,
}) ?? undefined;
const activities = await buildActivities(updated.cwOpportunityId, {
strategy,
ttlMs,
});
// ── Resolve CW opportunity data (cache-aware) ──────────────────────
let cwData: any;
let record = existing; // default: use the existing DB record as-is
return new OpportunityController(updated, {
company: updated.company
? await buildCompanyController(updated.company, { strategy, ttlMs })
: undefined,
customFields: cwData.customFields ?? [],
if (!opts?.fresh) {
// Try the Redis cache first
cwData = await getCachedOppCwData(existing.cwOpportunityId);
}
if (!cwData) {
// Cache miss or forced fresh — fetch from CW and cache
cwData = ttlMs
? await fetchAndCacheOppCwData(existing.cwOpportunityId, ttlMs)
: await opportunityCw.fetch(existing.cwOpportunityId);
if (!cwData) {
throw new GenericError({
message: "Opportunity not found in ConnectWise",
name: "OpportunityNotFound",
cause: `CW returned 404 for opportunity ${existing.cwOpportunityId}`,
status: 404,
});
}
// Map and update the DB record (only on cache miss/fresh)
const mapped = OpportunityController.mapCwToDb(cwData);
const companyId = cwData.company?.id
? ((
await prisma.company.findFirst({
where: { cw_CompanyId: cwData.company.id },
select: { id: true },
})
)?.id ?? null)
: null;
record = await prisma.opportunity.update({
where: { id: existing.id },
data: { ...mapped, companyId },
include: { company: true },
});
}
// Hydrate activities and company in parallel
const [activities, company] = await Promise.all([
buildActivities(record.cwOpportunityId, { strategy, ttlMs }),
record.company
? buildCompanyController(record.company, { strategy, ttlMs })
: Promise.resolve(undefined),
]);
return new OpportunityController(record, {
company,
customFields: cwData?.customFields ?? [],
activities,
});
},
@@ -0,0 +1,114 @@
/**
* @module computeProductsCacheTTL
*
* Adaptive Cache TTL for Opportunity Products
* ============================================
*
* Determines how long products (forecast items) should be cached in
* Redis before being re-fetched from ConnectWise.
*
* Products have unique caching rules compared to notes or contacts
* because they are typically finalised before a deal closes and do not
* change once the opportunity reaches a terminal status.
*
* ## Spec
*
* | # | Condition | TTL (ms) | TTL (human) | Rationale |
* |---|------------------------------------------------------------------------------|------------|-------------|---------------------------------------------------------------------------------------|
* | 1 | Status is **Won**, **Lost**, **Pending Won**, or **Pending Lost** | `null` | No cache | Products on terminal / near-terminal opps are static; no need to keep them warm. |
* | 2 | Opportunity is **not cacheable** (main cache TTL is `null`) | `null` | No cache | If the opp itself is evicted, sub-resources follow suit. |
* | 3 | `lastUpdated` is within the last **3 days** | 15 000 | 15 seconds | Actively-worked deals products are being edited and need near-real-time freshness. |
* | 4 | Everything else | 1 800 000 | 30 minutes | Lazy on-demand cache: fetched when requested, expires after 30 min without refresh. |
*
* ## Evaluation order
*
* Rules are evaluated top-to-bottom; the first matching rule wins.
*
* ## Inputs
*
* Extends {@link CacheTTLInput} from `computeCacheTTL` with an
* additional `statusCwId` field used to identify terminal statuses.
*
* ## Output
*
* Returns `number | null`:
* - Positive integer = TTL in **milliseconds**.
* - `null` = do **not** cache.
*/
import type { CacheTTLInput } from "./computeCacheTTL";
import { computeCacheTTL } from "./computeCacheTTL";
import { QUOTE_STATUSES } from "../../types/QuoteStatuses";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** 15 seconds — TTL for hot products (opportunity updated within 3 days). */
export const PRODUCTS_TTL_HOT = 15_000;
/** 30 minutes — TTL for on-demand product cache (lazy fallback). */
export const PRODUCTS_TTL_LAZY = 1_800_000;
/** 3 days in milliseconds. */
const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000;
/**
* Set of all CW status IDs that map to a Won or Lost canonical status.
*
* Built at module load from {@link QUOTE_STATUSES} so it stays in sync
* with any future status additions.
*/
export const WON_LOST_STATUS_IDS: ReadonlySet<number> = new Set(
QUOTE_STATUSES.filter((s) => s.wonFlag || s.lostFlag).flatMap((s) => [
s.id,
...s.optimaEquivalency,
]),
);
// ---------------------------------------------------------------------------
// Input type
// ---------------------------------------------------------------------------
export interface ProductsCacheTTLInput extends CacheTTLInput {
/** The CW status ID of the opportunity. */
statusCwId: number | null;
}
// ---------------------------------------------------------------------------
// Algorithm
// ---------------------------------------------------------------------------
/**
* Compute the cache TTL for an opportunity's products.
*
* @param input - The opportunity's activity signals plus status ID.
* @returns TTL in milliseconds, or `null` if products should not be cached.
*/
export function computeProductsCacheTTL(
input: ProductsCacheTTLInput,
): number | null {
const { statusCwId, lastUpdated, now = new Date() } = input;
// Rule 1 — Terminal statuses: Won / Lost / Pending Won / Pending Lost
if (statusCwId !== null && WON_LOST_STATUS_IDS.has(statusCwId)) {
return null;
}
// Rule 2 — If the opportunity itself is not cacheable, skip products too
const mainTTL = computeCacheTTL(input);
if (mainTTL === null) {
return null;
}
// Rule 3 — Hot: updated within the last 3 days
if (lastUpdated) {
const diff = Math.abs(now.getTime() - lastUpdated.getTime());
if (diff <= THREE_DAYS_MS) {
return PRODUCTS_TTL_HOT;
}
}
// Rule 4 — Lazy fallback
return PRODUCTS_TTL_LAZY;
}
@@ -0,0 +1,118 @@
/**
* @module computeSubResourceCacheTTL
*
* Adaptive Cache TTL for Opportunity Sub-Resources
* =================================================
*
* Determines how long cached sub-resource data (notes, contacts) should
* live before being re-fetched from ConnectWise.
*
* Sub-resources change less frequently than the opportunity record itself
* or its activity feed, so TTLs are longer than the primary cache. The
* same activity-signal heuristics are used (expected close date, last
* updated, closed status) but with relaxed durations.
*
* ## Spec
*
* | # | Condition | TTL (ms) | TTL (human) | Rationale |
* |---|-------------------------------------------------------------------|----------|-------------|--------------------------------------------------------------------|
* | 1 | `closedFlag` is `true` AND closed > 30 days ago | `null` | Do not cache| Old closed records are rarely accessed. |
* | 1b| `closedFlag` is `true` AND closed within 30 days | 300 000 | 5 minutes | Recently-closed records may still be viewed occasionally. |
* | 2 | `expectedCloseDate` OR `lastUpdated` within **5 days** | 60 000 | 60 seconds | Active deals contacts/notes may still change. |
* | 3 | `expectedCloseDate` OR `lastUpdated` within **14 days** | 120 000 | 2 minutes | Moderate activity less likely to change. |
* | 4 | Everything else (older than 14 days) | 300 000 | 5 minutes | Low activity safe to cache longer. |
*
* ## Evaluation order
*
* Rules are evaluated top-to-bottom; the first matching rule wins.
*
* ## Inputs
*
* Uses the same {@link CacheTTLInput} interface as `computeCacheTTL`.
*
* ## Output
*
* Returns `number | null`:
* - Positive integer = TTL in **milliseconds**.
* - `null` = do **not** cache.
*/
import type { CacheTTLInput } from "./computeCacheTTL";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** 60 seconds — TTL for high-activity sub-resources (within 5 days). */
export const SUB_TTL_HIGH_ACTIVITY = 60_000;
/** 2 minutes — TTL for moderate-activity sub-resources (within 14 days). */
export const SUB_TTL_MODERATE_ACTIVITY = 120_000;
/** 5 minutes — TTL for low-activity / stale sub-resources. */
export const SUB_TTL_LOW_ACTIVITY = 300_000;
/** 30 days in milliseconds. */
const THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000;
/** 5 days in milliseconds. */
const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000;
/** 14 days in milliseconds. */
const FOURTEEN_DAYS_MS = 14 * 24 * 60 * 60 * 1000;
// ---------------------------------------------------------------------------
// Algorithm
// ---------------------------------------------------------------------------
/**
* Compute the cache TTL for an opportunity sub-resource (notes, contacts).
*
* @param input - The opportunity's activity signals. See {@link CacheTTLInput}.
* @returns The TTL in milliseconds, or `null` if the data should not be cached.
*/
export function computeSubResourceCacheTTL(
input: CacheTTLInput,
): number | null {
const {
closedFlag,
closedDate,
expectedCloseDate,
lastUpdated,
now = new Date(),
} = input;
const nowMs = now.getTime();
const isWithinWindow = (date: Date | null, windowMs: number): boolean => {
if (!date) return false;
return Math.abs(nowMs - date.getTime()) <= windowMs;
};
// Rule 1 — Closed records
if (closedFlag) {
if (isWithinWindow(closedDate, THIRTY_DAYS_MS)) {
return SUB_TTL_LOW_ACTIVITY;
}
return null;
}
// Rule 2 — High activity (5 days)
if (
isWithinWindow(expectedCloseDate, FIVE_DAYS_MS) ||
isWithinWindow(lastUpdated, FIVE_DAYS_MS)
) {
return SUB_TTL_HIGH_ACTIVITY;
}
// Rule 3 — Moderate activity (14 days)
if (
isWithinWindow(expectedCloseDate, FOURTEEN_DAYS_MS) ||
isWithinWindow(lastUpdated, FOURTEEN_DAYS_MS)
) {
return SUB_TTL_MODERATE_ACTIVITY;
}
// Rule 4 — Low activity / stale
return SUB_TTL_LOW_ACTIVITY;
}
+546 -49
View File
@@ -13,6 +13,10 @@
* `CWActivity[]` array fetched from `activityCw.fetchByOpportunity()`.
* - **Company CW data** (`opp:company-cw:{cw_CompanyId}`) the hydrated
* company / contacts blob set by `CompanyController.hydrateCwData()`.
* - **Notes** (`opp:notes:{cwOpportunityId}`) raw CW notes array.
* - **Contacts** (`opp:contacts:{cwOpportunityId}`) raw CW contacts array.
* - **Products** (`opp:products:{cwOpportunityId}`) raw CW forecast +
* procurement products blob.
*
* TTLs are computed dynamically via {@link computeCacheTTL} so hot
* opportunities refresh every 30 s while stale ones live for 15 min.
@@ -28,8 +32,16 @@
import { prisma, redis } from "../../constants";
import { activityCw } from "../cw-utils/activities/activities";
import { computeCacheTTL } from "../algorithms/computeCacheTTL";
import { computeSubResourceCacheTTL } from "../algorithms/computeSubResourceCacheTTL";
import {
computeProductsCacheTTL,
PRODUCTS_TTL_HOT,
} from "../algorithms/computeProductsCacheTTL";
import { connectWiseApi } from "../../constants";
import { fetchCwCompanyById } from "../cw-utils/fetchCompany";
import { fetchCompanySite } from "../cw-utils/sites/companySites";
import { opportunityCw } from "../cw-utils/opportunities/opportunities";
import { withCwRetry } from "../cw-utils/withCwRetry";
import { events } from "../globalEvents";
// ---------------------------------------------------------------------------
@@ -38,6 +50,11 @@ import { events } from "../globalEvents";
const ACTIVITY_PREFIX = "opp:activities:";
const COMPANY_CW_PREFIX = "opp:company-cw:";
const NOTES_PREFIX = "opp:notes:";
const CONTACTS_PREFIX = "opp:contacts:";
const PRODUCTS_PREFIX = "opp:products:";
const SITE_PREFIX = "opp:site:";
const OPP_CW_PREFIX = "opp:cw-data:";
/** Redis key for cached activities by CW opportunity ID. */
export const activityCacheKey = (cwOppId: number) =>
@@ -47,6 +64,25 @@ export const activityCacheKey = (cwOppId: number) =>
export const companyCwCacheKey = (cwCompanyId: number) =>
`${COMPANY_CW_PREFIX}${cwCompanyId}`;
/** Redis key for cached opportunity notes by CW opportunity ID. */
export const notesCacheKey = (cwOppId: number) => `${NOTES_PREFIX}${cwOppId}`;
/** Redis key for cached opportunity contacts by CW opportunity ID. */
export const contactsCacheKey = (cwOppId: number) =>
`${CONTACTS_PREFIX}${cwOppId}`;
/** Redis key for cached opportunity products by CW opportunity ID. */
export const productsCacheKey = (cwOppId: number) =>
`${PRODUCTS_PREFIX}${cwOppId}`;
/** Redis key for cached company site by CW company ID + site ID. */
export const siteCacheKey = (cwCompanyId: number, cwSiteId: number) =>
`${SITE_PREFIX}${cwCompanyId}:${cwSiteId}`;
/** Redis key for cached CW opportunity response by CW opportunity ID. */
export const oppCwDataCacheKey = (cwOppId: number) =>
`${OPP_CW_PREFIX}${cwOppId}`;
// ---------------------------------------------------------------------------
// Read helpers
// ---------------------------------------------------------------------------
@@ -85,6 +121,147 @@ export async function getCachedCompanyCwData(
}
}
/**
* Retrieve cached opportunity notes (raw CW data).
*
* @returns The parsed raw CW notes array or `null` on cache miss.
*/
export async function getCachedNotes(
cwOpportunityId: number,
): Promise<any[] | null> {
const raw = await redis.get(notesCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached opportunity contacts (raw CW data).
*
* @returns The parsed raw CW contacts array or `null` on cache miss.
*/
export async function getCachedContacts(
cwOpportunityId: number,
): Promise<any[] | null> {
const raw = await redis.get(contactsCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached opportunity products (raw CW forecast + procurement blob).
*
* @returns `{ forecast, procProducts }` or `null` on cache miss.
*/
export async function getCachedProducts(
cwOpportunityId: number,
): Promise<{ forecast: any; procProducts: any[] } | null> {
const raw = await redis.get(productsCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached CW site data for a company/site pair.
*
* @returns Parsed site data or `null` on cache miss.
*/
export async function getCachedSite(
cwCompanyId: number,
cwSiteId: number,
): Promise<any | null> {
const raw = await redis.get(siteCacheKey(cwCompanyId, cwSiteId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
/**
* Retrieve cached CW opportunity response data.
*
* @returns Parsed CW opportunity object or `null` on cache miss.
*/
export async function getCachedOppCwData(
cwOpportunityId: number,
): Promise<any | null> {
const raw = await redis.get(oppCwDataCacheKey(cwOpportunityId));
if (!raw) return null;
try {
return JSON.parse(raw);
} catch {
return null;
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Check whether an error is an Axios 404 (resource not found in CW). */
function isNotFoundError(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
return e.isAxiosError === true && e.response?.status === 404;
}
/**
* Check whether an error is a transient network / timeout error.
*
* These are safe to swallow in background refresh tasks CW will be
* retried on the next refresh cycle. Logs a concise one-line warning
* instead of dumping the full Axios error object.
*/
function isTransientError(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
if (!e.isAxiosError) return false;
const code = e.code as string | undefined;
return (
code === "ECONNABORTED" ||
code === "ECONNREFUSED" ||
code === "ECONNRESET" ||
code === "ETIMEDOUT" ||
code === "ERR_NETWORK" ||
code === "ENETUNREACH" ||
code === "ERR_BAD_RESPONSE"
);
}
/** Build a concise error description for logging (avoids dumping entire Axios objects). */
function describeError(err: unknown): string {
if (typeof err !== "object" || err === null) return String(err);
const e = err as Record<string, any>;
if (e.isAxiosError) {
const method = (e.config?.method ?? "?").toUpperCase();
const url = e.config?.url ?? "unknown";
const code = e.code ?? "";
const status = e.response?.status ?? "";
return `${method} ${url}${code || `HTTP ${status}`} (${e.message})`;
}
return e.message ?? String(err);
}
/**
* When true, transient-error warnings inside fetchAndCache* are suppressed.
* Used during background refresh to avoid flooding the terminal the
* refresh function prints a single summary line instead.
*/
let _suppressTransientWarnings = false;
// ---------------------------------------------------------------------------
// Write helpers
// ---------------------------------------------------------------------------
@@ -92,21 +269,35 @@ export async function getCachedCompanyCwData(
/**
* Fetch activities from CW and cache them with the appropriate TTL.
*
* Returns an empty array if CW responds with 404 (opportunity doesn't
* exist or was deleted upstream).
*
* @returns The raw `CWActivity[]` collection (as plain array).
*/
export async function fetchAndCacheActivities(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
const collection = await activityCw.fetchByOpportunity(cwOpportunityId);
const arr = collection.map((item) => item);
await redis.set(
activityCacheKey(cwOpportunityId),
JSON.stringify(arr),
"PX",
ttlMs,
);
return arr;
try {
// Use the direct (single-call) variant to avoid the extra count request
const arr = await activityCw.fetchByOpportunityDirect(cwOpportunityId);
await redis.set(
activityCacheKey(cwOpportunityId),
JSON.stringify(arr),
"PX",
ttlMs,
);
return arr;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] activities opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
@@ -118,32 +309,255 @@ export async function fetchAndCacheCompanyCwData(
cwCompanyId: number,
ttlMs: number,
): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> {
const cwCompany = await fetchCwCompanyById(cwCompanyId);
if (!cwCompany) return null;
try {
const cwCompany = await fetchCwCompanyById(cwCompanyId);
if (!cwCompany) return null;
const contactHref = cwCompany.defaultContact?._info?.contact_href;
const defaultContactData = contactHref
? await connectWiseApi.get(contactHref)
: undefined;
const contactHref = cwCompany.defaultContact?._info?.contact_href;
const defaultContactData = contactHref
? await withCwRetry(() => connectWiseApi.get(contactHref), {
label: `company#${cwCompanyId}/defaultContact`,
})
: undefined;
const allContactsData = await connectWiseApi.get(
`${cwCompany._info.contacts_href}&pageSize=1000`,
);
const allContactsData = await withCwRetry(
() =>
connectWiseApi.get(`${cwCompany._info.contacts_href}&pageSize=1000`),
{ label: `company#${cwCompanyId}/allContacts` },
);
const blob = {
company: cwCompany,
defaultContact: defaultContactData?.data ?? null,
allContacts: allContactsData.data,
};
const blob = {
company: cwCompany,
defaultContact: defaultContactData?.data ?? null,
allContacts: allContactsData.data,
};
await redis.set(
companyCwCacheKey(cwCompanyId),
JSON.stringify(blob),
"PX",
ttlMs,
);
await redis.set(
companyCwCacheKey(cwCompanyId),
JSON.stringify(blob),
"PX",
ttlMs,
);
return blob;
return blob;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(`[cache] company#${cwCompanyId}: ${describeError(err)}`);
return null;
}
throw err;
}
}
/**
* Fetch opportunity notes from CW and cache the raw response.
*
* Returns an empty array if CW responds with 404.
*
* @returns The raw CW notes array.
*/
export async function fetchAndCacheNotes(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
try {
const notes = await opportunityCw.fetchNotes(cwOpportunityId);
await redis.set(
notesCacheKey(cwOpportunityId),
JSON.stringify(notes),
"PX",
ttlMs,
);
return notes;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] notes opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
* Fetch opportunity contacts from CW and cache the raw response.
*
* Returns an empty array if CW responds with 404.
*
* @returns The raw CW contacts array.
*/
export async function fetchAndCacheContacts(
cwOpportunityId: number,
ttlMs: number,
): Promise<any[]> {
try {
const contacts = await opportunityCw.fetchContacts(cwOpportunityId);
await redis.set(
contactsCacheKey(cwOpportunityId),
JSON.stringify(contacts),
"PX",
ttlMs,
);
return contacts;
} catch (err) {
if (isNotFoundError(err)) return [];
if (isTransientError(err)) {
console.warn(
`[cache] contacts opp#${cwOpportunityId}: ${describeError(err)}`,
);
return [];
}
throw err;
}
}
/**
* Invalidate cached notes for an opportunity.
*
* Call this after any note mutation (create, update, delete) so the
* next read refreshes from ConnectWise.
*/
export async function invalidateNotesCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(notesCacheKey(cwOpportunityId));
}
/**
* Invalidate cached contacts for an opportunity.
*
* Call this after any contact mutation so the next read refreshes
* from ConnectWise.
*/
export async function invalidateContactsCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(contactsCacheKey(cwOpportunityId));
}
/**
* Fetch opportunity products (forecast + procurement) from CW and cache.
*
* Stores both the forecast response and procurement products together
* so that `fetchProducts()` can reconstruct ForecastProductControllers
* from a single cache hit.
*
* @returns `{ forecast, procProducts }` blob.
*/
export async function fetchAndCacheProducts(
cwOpportunityId: number,
ttlMs: number,
): Promise<{ forecast: any; procProducts: any[] }> {
try {
const [forecast, procProducts] = await Promise.all([
opportunityCw.fetchProducts(cwOpportunityId),
opportunityCw.fetchProcurementProducts(cwOpportunityId),
]);
const blob = { forecast, procProducts };
await redis.set(
productsCacheKey(cwOpportunityId),
JSON.stringify(blob),
"PX",
ttlMs,
);
return blob;
} catch (err) {
if (isNotFoundError(err))
return { forecast: { forecastItems: [] }, procProducts: [] };
if (isTransientError(err)) {
console.warn(
`[cache] products opp#${cwOpportunityId}: ${describeError(err)}`,
);
return { forecast: { forecastItems: [] }, procProducts: [] };
}
throw err;
}
}
/**
* Invalidate cached products for an opportunity.
*
* Call this after any product mutation (add, update, resequence) so the
* next read refreshes from ConnectWise.
*/
export async function invalidateProductsCache(
cwOpportunityId: number,
): Promise<void> {
await redis.del(productsCacheKey(cwOpportunityId));
}
/**
* Site TTL 30 minutes. Site/address data rarely changes so we cache
* aggressively. The background refresh does NOT proactively warm site keys;
* they are populated lazily on the first detail-view request.
*/
const SITE_TTL_MS = 1_800_000;
/**
* Fetch a CW company site from ConnectWise and cache the result.
*
* @returns The raw CW site object.
*/
export async function fetchAndCacheSite(
cwCompanyId: number,
cwSiteId: number,
): Promise<any> {
try {
const site = await fetchCompanySite(cwCompanyId, cwSiteId);
await redis.set(
siteCacheKey(cwCompanyId, cwSiteId),
JSON.stringify(site),
"PX",
SITE_TTL_MS,
);
return site;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(
`[cache] site company#${cwCompanyId}/site#${cwSiteId}: ${describeError(err)}`,
);
return null;
}
throw err;
}
}
/**
* Fetch the raw CW opportunity response from ConnectWise and cache it.
*
* Used by `fetchItem()` in the manager to avoid a CW roundtrip when
* the detail view is reloaded within the cache TTL window.
*
* @param cwOpportunityId - The CW opportunity ID
* @param ttlMs - Cache TTL in milliseconds
* @returns The raw CW opportunity response object.
*/
export async function fetchAndCacheOppCwData(
cwOpportunityId: number,
ttlMs: number,
): Promise<any> {
try {
const cwData = await opportunityCw.fetch(cwOpportunityId);
await redis.set(
oppCwDataCacheKey(cwOpportunityId),
JSON.stringify(cwData),
"PX",
ttlMs,
);
return cwData;
} catch (err) {
if (isNotFoundError(err)) return null;
if (isTransientError(err)) {
console.warn(`[cache] opp#${cwOpportunityId}: ${describeError(err)}`);
return null;
}
throw err;
}
}
// ---------------------------------------------------------------------------
@@ -176,8 +590,10 @@ export async function refreshOpportunityCache(): Promise<void> {
closedDate: true,
expectedCloseDate: true,
cwLastUpdated: true,
statusCwId: true,
company: { select: { cw_CompanyId: true } },
},
orderBy: { cwLastUpdated: "desc" },
});
events.emit("cache:opportunities:refresh:started", {
@@ -186,25 +602,41 @@ export async function refreshOpportunityCache(): Promise<void> {
let activitiesRefreshed = 0;
let companiesRefreshed = 0;
let notesRefreshed = 0;
let contactsRefreshed = 0;
let productsRefreshed = 0;
let oppCwDataRefreshed = 0;
let skipped = 0;
// Batch-check which activity keys already exist via a pipeline
// Batch-check which keys already exist via a single pipeline
// (5 EXISTS per opportunity: oppCwData, activities, notes, contacts, products).
const pipeline = redis.pipeline();
for (const opp of opportunities) {
pipeline.exists(oppCwDataCacheKey(opp.cwOpportunityId));
pipeline.exists(activityCacheKey(opp.cwOpportunityId));
pipeline.exists(notesCacheKey(opp.cwOpportunityId));
pipeline.exists(contactsCacheKey(opp.cwOpportunityId));
pipeline.exists(productsCacheKey(opp.cwOpportunityId));
}
const existsResults = await pipeline.exec();
const refreshTasks: Promise<void>[] = [];
const refreshTasks: (() => Promise<void>)[] = [];
for (let i = 0; i < opportunities.length; i++) {
const opp = opportunities[i]!;
const ttl = computeCacheTTL({
const cacheTTLInput = {
closedFlag: opp.closedFlag,
closedDate: opp.closedDate,
expectedCloseDate: opp.expectedCloseDate,
lastUpdated: opp.cwLastUpdated,
};
const ttl = computeCacheTTL(cacheTTLInput);
const subTTL = computeSubResourceCacheTTL(cacheTTLInput);
const productsTTL = computeProductsCacheTTL({
...cacheTTLInput,
statusCwId: opp.statusCwId,
});
// Skip closed (ttl === null) — should not happen because of the query filter,
@@ -215,43 +647,108 @@ export async function refreshOpportunityCache(): Promise<void> {
}
// existsResults entries are [error, result] tuples
const activityExists = existsResults?.[i]?.[1] === 1;
// Pipeline order per opportunity: oppCwData, activities, notes, contacts, products
const baseIdx = i * 5;
const oppCwDataExists = existsResults?.[baseIdx]?.[1] === 1;
const activityExists = existsResults?.[baseIdx + 1]?.[1] === 1;
const notesExist = existsResults?.[baseIdx + 2]?.[1] === 1;
const contactsExist = existsResults?.[baseIdx + 3]?.[1] === 1;
const productsExist = existsResults?.[baseIdx + 4]?.[1] === 1;
// Proactively cache the CW opportunity response itself
if (!oppCwDataExists) {
refreshTasks.push(() =>
fetchAndCacheOppCwData(opp.cwOpportunityId, ttl).then(() => {
oppCwDataRefreshed++;
}),
);
}
if (!activityExists) {
refreshTasks.push(
refreshTasks.push(() =>
fetchAndCacheActivities(opp.cwOpportunityId, ttl).then(() => {
activitiesRefreshed++;
}),
);
}
// Refresh notes/contacts if sub-resource TTL applies and key is missing
if (subTTL !== null) {
if (!notesExist) {
refreshTasks.push(() =>
fetchAndCacheNotes(opp.cwOpportunityId, subTTL).then(() => {
notesRefreshed++;
}),
);
}
if (!contactsExist) {
refreshTasks.push(() =>
fetchAndCacheContacts(opp.cwOpportunityId, subTTL).then(() => {
contactsRefreshed++;
}),
);
}
}
// Proactively refresh products only for hot opps (updated within 3 days).
// 30-minute lazy-cached products are filled on-demand by the endpoint
// and do not need background refresh.
if (productsTTL === PRODUCTS_TTL_HOT && !productsExist) {
refreshTasks.push(() =>
fetchAndCacheProducts(opp.cwOpportunityId, productsTTL).then(() => {
productsRefreshed++;
}),
);
}
// Also refresh company CW data if the key is missing
if (opp.company?.cw_CompanyId) {
const cwCompanyId = opp.company.cw_CompanyId;
refreshTasks.push(
(async () => {
const companyExists = await redis.exists(
companyCwCacheKey(cwCompanyId),
);
if (!companyExists) {
await fetchAndCacheCompanyCwData(cwCompanyId, ttl);
companiesRefreshed++;
}
})(),
);
refreshTasks.push(async () => {
const companyExists = await redis.exists(
companyCwCacheKey(cwCompanyId),
);
if (!companyExists) {
await fetchAndCacheCompanyCwData(cwCompanyId, ttl);
companiesRefreshed++;
}
});
}
}
// Run all refresh tasks concurrently with bounded concurrency
const CONCURRENCY = 10;
// Run refresh thunks with bounded concurrency and inter-batch delay.
// Each thunk is only invoked here — no requests fire until we call them.
// CW rate-limits aggressively so we keep this conservative.
const CONCURRENCY = 6;
const BATCH_DELAY_MS = 250;
let timeoutCount = 0;
for (let i = 0; i < refreshTasks.length; i += CONCURRENCY) {
await Promise.allSettled(refreshTasks.slice(i, i + CONCURRENCY));
const batch = refreshTasks.slice(i, i + CONCURRENCY);
const results = await Promise.allSettled(batch.map((fn) => fn()));
for (const r of results) {
if (r.status === "rejected") timeoutCount++;
}
// Small delay between batches to avoid overwhelming CW
if (i + CONCURRENCY < refreshTasks.length) {
await new Promise((resolve) => setTimeout(resolve, BATCH_DELAY_MS));
}
}
if (timeoutCount > 0) {
console.warn(
`[cache] refresh: ${timeoutCount} task(s) failed (likely CW timeouts) — will retry next cycle`,
);
}
events.emit("cache:opportunities:refresh:completed", {
totalOpportunities: opportunities.length,
activitiesRefreshed,
companiesRefreshed,
notesRefreshed,
contactsRefreshed,
productsRefreshed,
oppCwDataRefreshed,
skipped,
});
}
@@ -115,6 +115,24 @@ export const activityCw = {
return activityCw.fetchAll(`opportunity/id=${opportunityId}`);
},
/**
* Fetch Activities by Opportunity (Direct)
*
* Lightweight single-call variant that skips the count request.
* Fetches up to 1000 activities in a single GET sufficient for
* virtually all opportunities. Used by the background cache refresh
* to avoid doubling CW API calls.
*/
fetchByOpportunityDirect: async (
opportunityId: number,
): Promise<CWActivity[]> => {
const conditions = encodeURIComponent(`opportunity/id=${opportunityId}`);
const response = await connectWiseApi.get(
`/sales/activities?pageSize=1000&conditions=${conditions}`,
);
return response.data;
},
/**
* Create Activity
*
+142
View File
@@ -0,0 +1,142 @@
/**
* @module cwApiLogger
*
* Axios interceptor-based logger that records every ConnectWise API
* request to a JSONL (newline-delimited JSON) file for post-hoc analysis.
*
* Each line in the log file is a self-contained JSON object with:
* - timestamp (ISO-8601)
* - method, url, baseURL
* - status (HTTP status or null on network error)
* - durationMs (wall-clock time from request start response/error)
* - error (error code / message, if any)
* - timeout (configured timeout in ms)
*
* Logging is **opt-in** set the `LOG_CW_API` environment variable to
* any truthy value to enable it. When enabled, each process start creates
* a new timestamped file inside the `cw-api-logs/` directory:
*
* LOG_CW_API=1 bun run dev # uses cw-api-logs/<timestamp>.jsonl
* bun run dev:log # shorthand (sets LOG_CW_API=1)
*
* Appends are non-blocking (fire-and-forget) to avoid slowing down
* the actual API flow.
*
* Usage:
* import { attachCwApiLogger } from "./modules/cw-utils/cwApiLogger";
* attachCwApiLogger(connectWiseApi);
*/
import { appendFile, mkdir } from "fs/promises";
import path from "path";
import type { AxiosInstance, InternalAxiosRequestConfig } from "axios";
const LOG_DIR = path.resolve(process.cwd(), "cw-api-logs");
/** Build a timestamped filename like `2026-03-02T14-30-05.123Z.jsonl` */
function buildLogPath(): string {
const ts = new Date().toISOString().replace(/:/g, "-");
return path.join(LOG_DIR, `${ts}.jsonl`);
}
let LOG_PATH: string | null = null;
// Symbol used to stash the start time on the request config
const START_TIME = Symbol("cwLogStartTime");
interface TimedConfig extends InternalAxiosRequestConfig {
[START_TIME]?: number;
}
export interface CwApiLogEntry {
timestamp: string;
method: string;
url: string;
baseURL: string;
status: number | null;
durationMs: number;
error: string | null;
timeout: number | undefined;
}
/** Write a single log entry (fire-and-forget). */
function writeEntry(entry: CwApiLogEntry): void {
if (!LOG_PATH) return;
appendFile(LOG_PATH, JSON.stringify(entry) + "\n").catch((err) => {
// Swallow write errors — logging should never crash the app
console.error("[cw-logger] failed to write log entry:", err.message);
});
}
/**
* Attach request/response interceptors to an Axios instance to log
* every CW API call with timing information.
*/
export function attachCwApiLogger(api: AxiosInstance): void {
if (!process.env.LOG_CW_API) {
return;
}
// Create the log directory and build a unique file path for this run
LOG_PATH = buildLogPath();
mkdir(LOG_DIR, { recursive: true }).catch((err) => {
console.error("[cw-logger] failed to create log directory:", err.message);
});
// ---- Request interceptor: record start time --------------------------
api.interceptors.request.use((config: TimedConfig) => {
config[START_TIME] = performance.now();
return config;
});
// ---- Response interceptor: log successful calls ----------------------
api.interceptors.response.use(
(response) => {
const config = response.config as TimedConfig;
const start = config[START_TIME] ?? performance.now();
const durationMs = Math.round(performance.now() - start);
writeEntry({
timestamp: new Date().toISOString(),
method: (config.method ?? "GET").toUpperCase(),
url: config.url ?? "",
baseURL: config.baseURL ?? "",
status: response.status,
durationMs,
error: null,
timeout: config.timeout,
});
return response;
},
// ---- Error interceptor: log failed calls -----------------------------
(err) => {
const config = (err.config ?? {}) as TimedConfig;
const start = config[START_TIME] ?? performance.now();
const durationMs = Math.round(performance.now() - start);
writeEntry({
timestamp: new Date().toISOString(),
method: (config.method ?? "GET").toUpperCase(),
url: config.url ?? "",
baseURL: config.baseURL ?? "",
status: err.response?.status ?? null,
durationMs,
error: err.code
? `${err.code}: ${err.message}`
: (err.message ?? "unknown"),
timeout: config.timeout,
});
return Promise.reject(err);
},
);
console.log(`[cw-logger] logging CW API calls to ${LOG_PATH}`);
}
/** Returns the current log file path (or null if logging is disabled). */
export function getCwLogPath(): string | null {
return LOG_PATH;
}
+8 -2
View File
@@ -1,12 +1,18 @@
import { connectWiseApi } from "../../constants";
import { Company } from "../../types/ConnectWiseTypes";
import { withCwRetry } from "./withCwRetry";
export const fetchCwCompanyById = async (
companyId: number,
): Promise<Company | null> => {
try {
const response = await connectWiseApi.get(
`/company/companies/${companyId}`,
const response = await withCwRetry(
() => connectWiseApi.get(`/company/companies/${companyId}`),
{
label: `fetchCompany#${companyId}`,
maxAttempts: 3,
baseDelayMs: 1_500,
},
);
return response.data;
} catch (error) {
+90
View File
@@ -0,0 +1,90 @@
/**
* Generic retry wrapper for ConnectWise API calls.
*
* Retries on transient errors (ECONNABORTED, ECONNRESET, ETIMEDOUT,
* ECONNREFUSED, ERR_NETWORK) with exponential backoff. Non-transient
* errors (e.g. 404, 400) are re-thrown immediately.
*/
const TRANSIENT_CODES = new Set([
"ECONNABORTED",
"ECONNRESET",
"ETIMEDOUT",
"ECONNREFUSED",
"ERR_NETWORK",
"ENETUNREACH",
]);
export interface CwRetryOptions {
/** Maximum number of attempts (including the first). Default: 3 */
maxAttempts?: number;
/** Base delay in ms before the first retry. Doubles each retry. Default: 1000 */
baseDelayMs?: number;
/** Optional label for log messages. */
label?: string;
}
/**
* Execute `fn` and retry up to `maxAttempts - 1` times on transient
* Axios / network errors.
*/
export async function withCwRetry<T>(
fn: () => Promise<T>,
opts: CwRetryOptions = {},
): Promise<T> {
const { maxAttempts = 3, baseDelayMs = 1_000, label } = opts;
let lastError: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
if (!isRetryable(err) || attempt === maxAttempts) throw err;
const delay = baseDelayMs * 2 ** (attempt - 1); // 1s, 2s, 4s …
const tag = label ? `[${label}] ` : "";
console.warn(
`${tag}CW transient error (attempt ${attempt}/${maxAttempts}), retrying in ${delay}ms — ${describeErr(err)}`,
);
await sleep(delay);
}
}
// Should never reach here, but satisfy TS:
throw lastError;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function isRetryable(err: unknown): boolean {
if (typeof err !== "object" || err === null) return false;
const e = err as Record<string, any>;
if (!e.isAxiosError) return false;
// Retry on known transient codes
if (TRANSIENT_CODES.has(e.code)) return true;
// Also retry on 5xx server errors from CW
const status = e.response?.status;
if (typeof status === "number" && status >= 500) return true;
return false;
}
function describeErr(err: unknown): string {
if (typeof err !== "object" || err === null) return String(err);
const e = err as Record<string, any>;
if (e.isAxiosError) {
const method = (e.config?.method ?? "?").toUpperCase();
const url = e.config?.url ?? "unknown";
return `${method} ${url}${e.code ?? `HTTP ${e.response?.status}`}`;
}
return (e as Error).message ?? String(err);
}
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
+4
View File
@@ -186,6 +186,10 @@ interface EventTypes {
totalOpportunities: number;
activitiesRefreshed: number;
companiesRefreshed: number;
notesRefreshed: number;
contactsRefreshed: number;
productsRefreshed: number;
oppCwDataRefreshed: number;
skipped: number;
}) => void;
"cache:opportunities:refresh:error": (data: { error: unknown }) => void;
+188
View File
@@ -0,0 +1,188 @@
import { describe, test, expect } from "bun:test";
import {
computeProductsCacheTTL,
PRODUCTS_TTL_HOT,
PRODUCTS_TTL_LAZY,
WON_LOST_STATUS_IDS,
} from "../../src/modules/algorithms/computeProductsCacheTTL";
const NOW = new Date("2026-03-02T12:00:00Z");
const DAY_MS = 24 * 60 * 60 * 1000;
describe("computeProductsCacheTTL", () => {
// -- Constants ----------------------------------------------------------
test("PRODUCTS_TTL_HOT is 15 seconds", () => {
expect(PRODUCTS_TTL_HOT).toBe(15_000);
});
test("PRODUCTS_TTL_LAZY is 30 minutes", () => {
expect(PRODUCTS_TTL_LAZY).toBe(1_800_000);
});
// -- Won/Lost status set ------------------------------------------------
test("WON_LOST_STATUS_IDS contains Won canonical ID (29) and Pending Won (49)", () => {
expect(WON_LOST_STATUS_IDS.has(29)).toBe(true);
expect(WON_LOST_STATUS_IDS.has(49)).toBe(true);
});
test("WON_LOST_STATUS_IDS contains Lost canonical ID (53) and Pending Lost (50)", () => {
expect(WON_LOST_STATUS_IDS.has(53)).toBe(true);
expect(WON_LOST_STATUS_IDS.has(50)).toBe(true);
});
test("WON_LOST_STATUS_IDS does not contain Active (58) or New (24)", () => {
expect(WON_LOST_STATUS_IDS.has(58)).toBe(false);
expect(WON_LOST_STATUS_IDS.has(24)).toBe(false);
});
// -- Rule 1: Won/Lost/Pending Won/Lost → null --------------------------
test("returns null for Won status (CW ID 29)", () => {
const result = computeProductsCacheTTL({
statusCwId: 29,
closedFlag: true,
closedDate: new Date(NOW.getTime() - 2 * DAY_MS),
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBeNull();
});
test("returns null for Pending Won status (CW ID 49)", () => {
const result = computeProductsCacheTTL({
statusCwId: 49,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBeNull();
});
test("returns null for Lost status (CW ID 53)", () => {
const result = computeProductsCacheTTL({
statusCwId: 53,
closedFlag: true,
closedDate: new Date(NOW.getTime() - 5 * DAY_MS),
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBeNull();
});
test("returns null for Pending Lost status (CW ID 50)", () => {
const result = computeProductsCacheTTL({
statusCwId: 50,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBeNull();
});
// -- Rule 2: Opp not cacheable → null ----------------------------------
test("returns null when opp is closed > 30 days (main cache null)", () => {
const result = computeProductsCacheTTL({
statusCwId: 58, // Active — but closed flag overrides
closedFlag: true,
closedDate: new Date(NOW.getTime() - 60 * DAY_MS),
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBeNull();
});
// -- Rule 3: Updated within 3 days → 15s -------------------------------
test("returns PRODUCTS_TTL_HOT when lastUpdated is within 3 days", () => {
const result = computeProductsCacheTTL({
statusCwId: 58,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_HOT);
});
test("returns PRODUCTS_TTL_HOT when lastUpdated is exactly 3 days ago", () => {
const result = computeProductsCacheTTL({
statusCwId: 58,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 3 * DAY_MS),
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_HOT);
});
// -- Rule 4: Everything else → 30 min ----------------------------------
test("returns PRODUCTS_TTL_LAZY when lastUpdated is > 3 days ago", () => {
const result = computeProductsCacheTTL({
statusCwId: 58,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 10 * DAY_MS),
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_LAZY);
});
test("returns PRODUCTS_TTL_LAZY when no lastUpdated is set", () => {
const result = computeProductsCacheTTL({
statusCwId: 58,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_LAZY);
});
test("returns PRODUCTS_TTL_LAZY for recently-closed (within 30 days) non-won/lost", () => {
// Edge case: closedFlag true, but status is not Won/Lost (unusual but possible)
const result = computeProductsCacheTTL({
statusCwId: 56, // Internal Review (not won/lost)
closedFlag: true,
closedDate: new Date(NOW.getTime() - 10 * DAY_MS),
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 20 * DAY_MS),
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_LAZY);
});
// -- Rule priority: Won/Lost takes priority over recent activity --------
test("Won status takes priority even with very recent lastUpdated", () => {
const result = computeProductsCacheTTL({
statusCwId: 29,
closedFlag: true,
closedDate: new Date(NOW.getTime() - 1 * DAY_MS),
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1000), // 1 second ago
now: NOW,
});
expect(result).toBeNull();
});
// -- Null statusCwId (should not skip rule 1) ---------------------------
test("null statusCwId falls through to other rules", () => {
const result = computeProductsCacheTTL({
statusCwId: null,
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBe(PRODUCTS_TTL_HOT);
});
});
@@ -0,0 +1,126 @@
import { describe, test, expect } from "bun:test";
import {
computeSubResourceCacheTTL,
SUB_TTL_HIGH_ACTIVITY,
SUB_TTL_MODERATE_ACTIVITY,
SUB_TTL_LOW_ACTIVITY,
} from "../../src/modules/algorithms/computeSubResourceCacheTTL";
const NOW = new Date("2026-03-02T12:00:00Z");
const DAY_MS = 24 * 60 * 60 * 1000;
describe("computeSubResourceCacheTTL", () => {
// -- Rule 1a: closed > 30 days → null -----------------------------------
test("returns null for records closed > 30 days ago", () => {
const result = computeSubResourceCacheTTL({
closedFlag: true,
closedDate: new Date(NOW.getTime() - 31 * DAY_MS),
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBeNull();
});
// -- Rule 1b: closed within 30 days → SUB_TTL_LOW_ACTIVITY ---------------
test("returns SUB_TTL_LOW_ACTIVITY for recently-closed records", () => {
const result = computeSubResourceCacheTTL({
closedFlag: true,
closedDate: new Date(NOW.getTime() - 10 * DAY_MS),
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBe(SUB_TTL_LOW_ACTIVITY);
});
// -- Rule 2: within 5 days → SUB_TTL_HIGH_ACTIVITY ----------------------
test("returns SUB_TTL_HIGH_ACTIVITY when expectedCloseDate is within 5 days", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: new Date(NOW.getTime() + 2 * DAY_MS),
lastUpdated: null,
now: NOW,
});
expect(result).toBe(SUB_TTL_HIGH_ACTIVITY);
});
test("returns SUB_TTL_HIGH_ACTIVITY when lastUpdated is within 5 days", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 3 * DAY_MS),
now: NOW,
});
expect(result).toBe(SUB_TTL_HIGH_ACTIVITY);
});
// -- Rule 3: within 14 days → SUB_TTL_MODERATE_ACTIVITY -----------------
test("returns SUB_TTL_MODERATE_ACTIVITY when expectedCloseDate is within 14 days", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: new Date(NOW.getTime() + 10 * DAY_MS),
lastUpdated: null,
now: NOW,
});
expect(result).toBe(SUB_TTL_MODERATE_ACTIVITY);
});
test("returns SUB_TTL_MODERATE_ACTIVITY when lastUpdated is within 14 days", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: new Date(NOW.getTime() - 8 * DAY_MS),
now: NOW,
});
expect(result).toBe(SUB_TTL_MODERATE_ACTIVITY);
});
// -- Rule 4: everything else → SUB_TTL_LOW_ACTIVITY ---------------------
test("returns SUB_TTL_LOW_ACTIVITY for stale records", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: new Date(NOW.getTime() - 30 * DAY_MS),
lastUpdated: new Date(NOW.getTime() - 30 * DAY_MS),
now: NOW,
});
expect(result).toBe(SUB_TTL_LOW_ACTIVITY);
});
test("returns SUB_TTL_LOW_ACTIVITY when no dates are set", () => {
const result = computeSubResourceCacheTTL({
closedFlag: false,
closedDate: null,
expectedCloseDate: null,
lastUpdated: null,
now: NOW,
});
expect(result).toBe(SUB_TTL_LOW_ACTIVITY);
});
// -- TTL ordering -------------------------------------------------------
test("SUB_TTL values are ordered correctly", () => {
expect(SUB_TTL_HIGH_ACTIVITY).toBe(60_000);
expect(SUB_TTL_MODERATE_ACTIVITY).toBe(120_000);
expect(SUB_TTL_LOW_ACTIVITY).toBe(300_000);
expect(SUB_TTL_HIGH_ACTIVITY).toBeLessThan(SUB_TTL_MODERATE_ACTIVITY);
expect(SUB_TTL_MODERATE_ACTIVITY).toBeLessThan(SUB_TTL_LOW_ACTIVITY);
});
// -- Closed flag takes priority ------------------------------------------
test("closed flag takes priority over recent activity dates", () => {
const result = computeSubResourceCacheTTL({
closedFlag: true,
closedDate: new Date(NOW.getTime() - 60 * DAY_MS),
expectedCloseDate: new Date(NOW.getTime() - 1 * DAY_MS),
lastUpdated: new Date(NOW.getTime() - 1 * DAY_MS),
now: NOW,
});
expect(result).toBeNull();
});
});
+247
View File
@@ -0,0 +1,247 @@
/**
* CW Endpoint Validator
*
* Validates that all ConnectWise API endpoints used by the application
* are reachable and respond correctly. Uses the same axios instance
* and credentials as the running app.
*
* Usage: bun ./utils/validateCwEndpoints.ts
*/
import { connectWiseApi } from "../src/constants";
import { prisma } from "../src/constants";
interface EndpointTest {
name: string;
method: "get" | "post" | "patch" | "delete";
url: string;
/** If true, a 404 is treated as "endpoint exists but resource doesn't" — still a pass. */
allow404?: boolean;
}
// ---------------------------------------------------------------------------
// Build test list — some require real IDs from the database
// ---------------------------------------------------------------------------
async function buildTestList(): Promise<EndpointTest[]> {
// Grab a sample opportunity from the DB to test with real IDs
const sampleOpp = await prisma.opportunity.findFirst({
where: { closedFlag: false },
select: { cwOpportunityId: true, companyId: true },
orderBy: { cwLastUpdated: "desc" },
});
const sampleCompany = await prisma.company.findFirst({
select: { cw_CompanyId: true },
});
const oppId = sampleOpp?.cwOpportunityId ?? 1;
const companyId = sampleCompany?.cw_CompanyId ?? 1;
return [
// ── Core counts (lightweight, always work) ──────────────────────────
{
name: "Opportunities count",
method: "get",
url: "/sales/opportunities/count",
},
{
name: "Activities count",
method: "get",
url: "/sales/activities/count",
},
{
name: "Companies count",
method: "get",
url: "/company/companies/count",
},
{
name: "Members count",
method: "get",
url: "/system/members/count",
},
{
name: "Catalog count",
method: "get",
url: "/procurement/catalog/count",
},
// ── Paginated list endpoints ────────────────────────────────────────
{
name: "Opportunities list (page 1, size 1)",
method: "get",
url: "/sales/opportunities?page=1&pageSize=1",
},
{
name: "Activities list (page 1, size 1)",
method: "get",
url: "/sales/activities?page=1&pageSize=1",
},
{
name: "Companies list (page 1, size 1)",
method: "get",
url: "/company/companies?page=1&pageSize=1",
},
{
name: "Members list (page 1, size 1)",
method: "get",
url: "/system/members?page=1&pageSize=1",
},
{
name: "Catalog list (page 1, size 1)",
method: "get",
url: "/procurement/catalog?page=1&pageSize=1",
},
{
name: "User-defined fields (page 1)",
method: "get",
url: "/system/userDefinedFields?pageSize=1",
},
// ── Single-resource fetches (need real IDs) ─────────────────────────
{
name: `Opportunity #${oppId}`,
method: "get",
url: `/sales/opportunities/${oppId}`,
allow404: true,
},
{
name: `Opportunity #${oppId} forecast`,
method: "get",
url: `/sales/opportunities/${oppId}/forecast`,
allow404: true,
},
{
name: `Opportunity #${oppId} notes`,
method: "get",
url: `/sales/opportunities/${oppId}/notes`,
allow404: true,
},
{
name: `Opportunity #${oppId} contacts`,
method: "get",
url: `/sales/opportunities/${oppId}/contacts`,
allow404: true,
},
{
name: `Activities for opp #${oppId}`,
method: "get",
url: `/sales/activities/count?conditions=${encodeURIComponent(`opportunity/id=${oppId}`)}`,
allow404: true,
},
{
name: `Procurement products for opp #${oppId}`,
method: "get",
url: `/procurement/products?conditions=${encodeURIComponent(`opportunity/id=${oppId}`)}&fields=id,forecastDetailId,cancelledFlag`,
allow404: true,
},
{
name: `Company #${companyId}`,
method: "get",
url: `/company/companies/${companyId}`,
allow404: true,
},
{
name: `Company #${companyId} sites`,
method: "get",
url: `/company/companies/${companyId}/sites?pageSize=1`,
allow404: true,
},
{
name: `Company #${companyId} configurations`,
method: "get",
url: `/company/configurations?conditions=${encodeURIComponent(`company/id=${companyId}`)}&pageSize=1`,
allow404: true,
},
];
}
// ---------------------------------------------------------------------------
// Runner
// ---------------------------------------------------------------------------
async function main() {
console.log(
"╔══════════════════════════════════════════════════════════════╗",
);
console.log(
"║ ConnectWise API Endpoint Validator ║",
);
console.log(
"╚══════════════════════════════════════════════════════════════╝",
);
console.log();
console.log(`Base URL: ${connectWiseApi.defaults.baseURL}`);
console.log(`Timeout: ${connectWiseApi.defaults.timeout ?? "none"}ms`);
console.log();
const tests = await buildTestList();
let passed = 0;
let failed = 0;
let warned = 0;
for (const test of tests) {
const start = performance.now();
try {
const response = await connectWiseApi.request({
method: test.method,
url: test.url,
timeout: 30_000, // Use a generous timeout for validation
});
const elapsed = Math.round(performance.now() - start);
const statusTag =
elapsed > 5000 ? `⚠️ SLOW (${elapsed}ms)` : `${elapsed}ms`;
console.log(`${test.name}${response.status} [${statusTag}]`);
if (elapsed > 5000) warned++;
passed++;
} catch (err: any) {
const elapsed = Math.round(performance.now() - start);
if (err?.isAxiosError) {
const status = err.response?.status;
const code = err.code;
if (status === 404 && test.allow404) {
console.log(
` ⚠️ ${test.name} — 404 (resource not found, endpoint OK) [${elapsed}ms]`,
);
warned++;
continue;
}
if (code === "ECONNABORTED") {
console.log(`${test.name} — TIMEOUT after ${elapsed}ms`);
} else if (code === "ECONNREFUSED") {
console.log(
`${test.name} — CONNECTION REFUSED (CW server down?)`,
);
} else if (status) {
console.log(
`${test.name} — HTTP ${status} [${elapsed}ms]: ${err.response?.data?.message ?? err.message}`,
);
} else {
console.log(
`${test.name}${code ?? err.message} [${elapsed}ms]`,
);
}
} else {
console.log(`${test.name}${err.message} [${elapsed}ms]`);
}
failed++;
}
}
console.log();
console.log("─".repeat(64));
console.log(
` Results: ${passed} passed, ${warned} warnings, ${failed} failed (${tests.length} total)`,
);
console.log("─".repeat(64));
await prisma.$disconnect();
process.exit(failed > 0 ? 1 : 0);
}
main().catch((err) => {
console.error("Fatal error:", err.message);
process.exit(1);
});