feat: add CW callback route and optimize cache refresh workflows

This commit is contained in:
2026-03-03 19:46:48 -06:00
parent 6d935e7180
commit a048e1e824
28 changed files with 3268 additions and 294 deletions
+50
View File
@@ -32,6 +32,56 @@ See [PERMISSIONS.md](PERMISSIONS.md) for the full list of field-level permission
## Authentication Routes ## Authentication Routes
## ConnectWise Callback Routes
### Receive ConnectWise Callback
**POST** `/cw/callback/:secret/:resource`
Receives ConnectWise callback/webhook payloads for supported resources and returns a normalized success response.
**Authentication Required:** No
**Path Parameters:**
- `secret` — Shared callback secret, validated against `CW_CALLBACK_SECRET` when configured
- `resource` — one of: `opportunity`, `ticket`, `company`, `activity`
**Behavior:**
- Parses JSON request body when present.
- Decodes JSON-encoded payload fields such as `Entity`.
- Logs a concise callback summary to console.
**Response:**
```json
{
"status": 200,
"message": "CW callback received.",
"data": {
"resource": "ticket",
"summary": {
"resource": "ticket",
"messageId": "1bec7421-204a-4b30-8b06-465915e9a0f5",
"action": "updated",
"type": "ticket",
"id": 36073,
"memberId": "jroberts",
"entityStatus": "In Progress",
"entitySummary": "Onsite Installation: Rough-In",
"entityUpdatedBy": "cirvine",
"entityLastUpdated": "2026-03-03T21:43:29.903"
},
"bodyParsed": {},
"receivedAt": "2026-03-04T00:00:00.000Z"
},
"successful": true
}
```
---
### Get Authentication URI ### Get Authentication URI
**GET** `/auth/uri` **GET** `/auth/uri`
+127 -76
View File
@@ -11,7 +11,7 @@ The API caches expensive ConnectWise (CW) API responses in **Redis** to reduce l
### Key design principles ### Key design principles
- **Adaptive TTLs** — cache durations are computed dynamically based on how "hot" an opportunity is (recently updated = shorter TTL = fresher data). - **Adaptive TTLs** — cache durations are computed dynamically based on how "hot" an opportunity is (recently updated = shorter TTL = fresher data).
- **Background refresh** — a 30-second interval scans all open opportunities and re-fetches only expired cache keys. - **Background refresh** — a 20-minute interval scans all open opportunities and re-fetches only expired cache keys.
- **Bounded concurrency** — CW API calls are throttled via thunk-based batching to prevent overwhelming the upstream API. - **Bounded concurrency** — CW API calls are throttled via thunk-based batching to prevent overwhelming the upstream API.
- **Graceful degradation** — transient CW errors (timeouts, network failures) are caught, logged, and retried on the next cycle rather than crashing the process. - **Graceful degradation** — transient CW errors (timeouts, network failures) are caught, logged, and retried on the next cycle rather than crashing the process.
- **Priority ordering** — most recently updated opportunities are refreshed first so active deals get fresh data before stale ones. - **Priority ordering** — most recently updated opportunities are refreshed first so active deals get fresh data before stale ones.
@@ -22,15 +22,21 @@ The API caches expensive ConnectWise (CW) API responses in **Redis** to reduce l
Each non-closed opportunity can have up to 7 cached payloads in Redis: Each non-closed opportunity can have up to 7 cached payloads in Redis:
| Cache Key Pattern | Data | Source | | Cache Key Pattern | Data | Source |
|---|---|---| | ----------------------------------- | ------------------------------------ | --------------------------------------------------------------------- |
| `opp:cw-data:{cwOpportunityId}` | Raw CW opportunity response | `GET /sales/opportunities/:id` | | `opp:cw-data:{cwOpportunityId}` | Raw CW opportunity response | `GET /sales/opportunities/:id` |
| `opp:activities:{cwOpportunityId}` | CW activities array | `GET /sales/activities?conditions=opportunity/id=:id` | | `opp:activities:{cwOpportunityId}` | CW activities array | `GET /sales/activities?conditions=opportunity/id=:id` |
| `opp:notes:{cwOpportunityId}` | CW notes array | `GET /sales/opportunities/:id/notes` | | `opp:notes:{cwOpportunityId}` | CW notes array | `GET /sales/opportunities/:id/notes` |
| `opp:contacts:{cwOpportunityId}` | CW contacts array | `GET /sales/opportunities/:id/contacts` | | `opp:contacts:{cwOpportunityId}` | CW contacts array | `GET /sales/opportunities/:id/contacts` |
| `opp:products:{cwOpportunityId}` | Forecast + procurement products blob | `GET /sales/opportunities/:id/forecast` + `GET /procurement/products` | | `opp:products:{cwOpportunityId}` | Forecast + procurement products blob | `GET /sales/opportunities/:id/forecast` + `GET /procurement/products` |
| `opp:company-cw:{cw_CompanyId}` | Hydrated company + contacts blob | `GET /company/companies/:id` + contacts endpoints | | `opp:company-cw:{cw_CompanyId}` | Hydrated company + contacts blob | `GET /company/companies/:id` + contacts endpoints |
| `opp:site:{cwCompanyId}:{cwSiteId}` | Company site data | `GET /company/companies/:id/sites/:siteId` | | `opp:site:{cwCompanyId}:{cwSiteId}` | Company site data | `GET /company/companies/:id/sites/:siteId` |
Inventory-adjustment-driven catalog sync adds a targeted product cache:
| Cache Key Pattern | Data | Source |
| ------------------------ | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------- |
| `catalog:item:cw:{cwId}` | Full CW catalog item + computed `onHand` + DB row snapshot | `GET /procurement/adjustments` + `GET /procurement/catalog/:id` + catalog inventory endpoint |
--- ---
@@ -49,13 +55,13 @@ Three algorithms compute cache TTLs. All share the same input signals:
Used for: opportunity CW data, activities, company CW data. Used for: opportunity CW data, activities, company CW data.
| # | Condition | TTL | Human | | # | Condition | TTL | Human |
|---|---|---|---| | --- | ------------------------------------------------------- | ---------- | ------------ |
| 1a | Closed > 30 days ago | `null` | Do not cache | | 1a | Closed > 30 days ago | `null` | Do not cache |
| 1b | Closed within 30 days | 900,000 ms | 15 minutes | | 1b | Closed within 30 days | 900,000 ms | 15 minutes |
| 2 | `expectedCloseDate` or `lastUpdated` within **5 days** | 30,000 ms | 30 seconds | | 2 | `expectedCloseDate` or `lastUpdated` within **5 days** | 30,000 ms | 30 seconds |
| 3 | `expectedCloseDate` or `lastUpdated` within **14 days** | 60,000 ms | 60 seconds | | 3 | `expectedCloseDate` or `lastUpdated` within **14 days** | 60,000 ms | 60 seconds |
| 4 | Everything else | 900,000 ms | 15 minutes | | 4 | Everything else | 900,000 ms | 15 minutes |
Rules are evaluated top-to-bottom; first match wins. Rules are evaluated top-to-bottom; first match wins.
@@ -65,13 +71,13 @@ Rules are evaluated top-to-bottom; first match wins.
Used for: notes, contacts. Used for: notes, contacts.
| # | Condition | TTL | Human | | # | Condition | TTL | Human |
|---|---|---|---| | --- | --------------------- | ---------- | ------------ |
| 1a | Closed > 30 days ago | `null` | Do not cache | | 1a | Closed > 30 days ago | `null` | Do not cache |
| 1b | Closed within 30 days | 300,000 ms | 5 minutes | | 1b | Closed within 30 days | 300,000 ms | 5 minutes |
| 2 | Within **5 days** | 60,000 ms | 60 seconds | | 2 | Within **5 days** | 60,000 ms | 60 seconds |
| 3 | Within **14 days** | 120,000 ms | 2 minutes | | 3 | Within **14 days** | 120,000 ms | 2 minutes |
| 4 | Everything else | 300,000 ms | 5 minutes | | 4 | Everything else | 300,000 ms | 5 minutes |
### Products TTL (`computeProductsCacheTTL`) ### Products TTL (`computeProductsCacheTTL`)
@@ -79,18 +85,18 @@ Used for: notes, contacts.
Used for: forecast + procurement products. Used for: forecast + procurement products.
| # | Condition | TTL | Human | | # | Condition | TTL | Human |
|---|---|---|---| | --- | ------------------------------------------- | ------------ | ---------- |
| 1 | Status is Won/Lost/Pending Won/Pending Lost | `null` | No cache | | 1 | Status is Won/Lost/Pending Won/Pending Lost | `null` | No cache |
| 2 | Main cache TTL is `null` | `null` | No cache | | 2 | Main cache TTL is `null` | `null` | No cache |
| 3 | `lastUpdated` within **3 days** | 15,000 ms | 15 seconds | | 3 | `lastUpdated` within **3 days** | 15,000 ms | 15 seconds |
| 4 | Everything else | 1,800,000 ms | 30 minutes | | 4 | Everything else | 1,200,000 ms | 20 minutes |
Products on terminal-status opportunities are never proactively cached. Non-hot products use a **lazy on-demand** cache — they're fetched when requested and cached for 30 minutes. Products on terminal-status opportunities are never proactively cached. Non-hot products use a **lazy on-demand** cache — they're fetched when requested and cached for 20 minutes.
### Site TTL ### Site TTL
Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely changes. Sites are **not** proactively warmed by the background refresh — they are populated lazily on the first detail-view request. Sites use a fixed TTL of **20 minutes** (1,200,000 ms). Site/address data rarely changes. Sites are **not** proactively warmed by the background refresh — they are populated lazily on the first detail-view request.
--- ---
@@ -98,7 +104,7 @@ Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely
**Function:** `refreshOpportunityCache()` in `src/modules/cache/opportunityCache.ts` **Function:** `refreshOpportunityCache()` in `src/modules/cache/opportunityCache.ts`
**Interval:** Every 30 seconds, triggered from `src/index.ts`. **Interval:** Every 20 minutes, triggered from `src/index.ts`.
### Refresh cycle ### Refresh cycle
@@ -108,17 +114,61 @@ Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely
4. **Execute with bounded concurrency** — process thunks in batches of `CONCURRENCY` (currently **6**), with a `BATCH_DELAY_MS` (currently **250ms**) pause between batches. Each thunk is only invoked inside the batch loop. 4. **Execute with bounded concurrency** — process thunks in batches of `CONCURRENCY` (currently **6**), with a `BATCH_DELAY_MS` (currently **250ms**) pause between batches. Each thunk is only invoked inside the batch loop.
5. **Emit events**`cache:opportunities:refresh:started` and `cache:opportunities:refresh:completed` events are emitted for the event debugger. 5. **Emit events**`cache:opportunities:refresh:started` and `cache:opportunities:refresh:completed` events are emitted for the event debugger.
### Inventory-adjustment listener cycle
**Function:** `listenInventoryAdjustments()` in `src/modules/cw-utils/procurement/listenInventoryAdjustments.ts`
**Interval:** Every 60 seconds, triggered from `src/index.ts`.
1. Fetch `GET /procurement/adjustments?pageSize=1000`.
2. Build a normalized snapshot of tracked inventory rows (`cwCatalogId`, `onHand`, `inventory`) per adjustment.
3. Compare to previous snapshot; extract only changed product IDs.
4. For each changed product ID, fetch fresh CW catalog item + current on-hand.
5. Upsert `CatalogItem` in Postgres and write Redis key `catalog:item:cw:{cwId}` with a 20-minute TTL.
Guardrails to prevent request storms:
- Diffing is computed at **product state** level (grouped by `cwCatalogId`), not raw adjustment-row churn.
- Per-cycle syncs are capped (`CW_ADJUSTMENT_SYNC_MAX_PER_CYCLE`, default `50`).
- Product resync cooldown is enforced (`CW_ADJUSTMENT_SYNC_COOLDOWN_MS`, default `600000` ms / 10 min).
This avoids full-catalog sweeps for small inventory movements and updates only the products implicated by adjustments.
### Full procurement catalog refresh
**Function:** `refreshCatalog()` in `src/modules/cw-utils/procurement/refreshCatalog.ts`
**Interval:** Every 30 minutes, triggered from `src/index.ts`.
The full catalog cache/DB sync uses the same slow-parallel thunk strategy as opportunity cache refreshes:
- Build arrays of thunk tasks (`() => Promise<void>`) for CW item fetches, inventory fetches, and DB upserts.
- Execute with bounded concurrency (`CONCURRENCY=6`).
- Pause between batches (`BATCH_DELAY_MS=250`) to avoid CW burst pressure.
- Log task failures and retry naturally on the next cycle.
This keeps full-catalog refresh conservative while inventory-adjustment listener handles near-real-time targeted updates.
### Full inventory sweep fallback
`refreshInventory()` remains as a safety net but is intentionally infrequent:
- Runs every **6 hours** from `src/index.ts` (no startup-time full sweep).
- Uses the same slow-parallel pattern (`CONCURRENCY=6`, `BATCH_DELAY_MS=250`) to avoid burst traffic.
Most on-hand freshness now comes from the 60-second adjustment listener plus 30-minute full catalog refresh.
### Concurrency control ### Concurrency control
The thunk pattern is critical. Previously, tasks were pushed as already-executing promises (`refreshTasks.push(fetchAndCache(...))`), which meant all HTTP requests fired simultaneously regardless of the batching loop. The fix was changing the array type from `Promise<void>[]` to `(() => Promise<void>)[]` so requests only start when explicitly invoked: `batch.map((fn) => fn())`. The thunk pattern is critical. Previously, tasks were pushed as already-executing promises (`refreshTasks.push(fetchAndCache(...))`), which meant all HTTP requests fired simultaneously regardless of the batching loop. The fix was changing the array type from `Promise<void>[]` to `(() => Promise<void>)[]` so requests only start when explicitly invoked: `batch.map((fn) => fn())`.
### Current tuning ### Current tuning
| Parameter | Value | Effect | | Parameter | Value | Effect |
|---|---|---| | ---------------- | ---------- | ------------------------------------------ |
| `CONCURRENCY` | 6 | Max simultaneous CW API requests per batch | | `CONCURRENCY` | 6 | Max simultaneous CW API requests per batch |
| `BATCH_DELAY_MS` | 250 | Milliseconds between batches | | `BATCH_DELAY_MS` | 250 | Milliseconds between batches |
| Refresh interval | 30 seconds | How often the full sweep runs | | Refresh interval | 20 minutes | How often the full sweep runs |
At these settings, a full sweep of ~500 expired keys completes in ~1-2 minutes with zero CW errors and ~230ms median latency. At these settings, a full sweep of ~500 expired keys completes in ~1-2 minutes with zero CW errors and ~230ms median latency.
@@ -142,11 +192,11 @@ Wraps CW API calls with exponential backoff retry on transient errors.
### Default configuration ### Default configuration
| Parameter | Default | Description | | Parameter | Default | Description |
|---|---|---| | ------------- | ------- | ----------------------------------------------------------- |
| `maxAttempts` | 3 | Total attempts including the first | | `maxAttempts` | 3 | Total attempts including the first |
| `baseDelayMs` | 1,000 | Delay before first retry (doubles each retry: 1s → 2s → 4s) | | `baseDelayMs` | 1,000 | Delay before first retry (doubles each retry: 1s → 2s → 4s) |
| `label` | — | Optional tag for log messages | | `label` | — | Optional tag for log messages |
### Usage ### Usage
@@ -181,16 +231,16 @@ LOG_CW_API=1 bun run dev
### Log entry fields ### Log entry fields
| Field | Type | Description | | Field | Type | Description |
|---|---|---| | ------------ | ----------------- | ----------------------------------- |
| `timestamp` | string (ISO-8601) | When the request completed | | `timestamp` | string (ISO-8601) | When the request completed |
| `method` | string | HTTP method | | `method` | string | HTTP method |
| `url` | string | Request URL (relative or absolute) | | `url` | string | Request URL (relative or absolute) |
| `baseURL` | string | Axios baseURL | | `baseURL` | string | Axios baseURL |
| `status` | number \| null | HTTP status (null on network error) | | `status` | number \| null | HTTP status (null on network error) |
| `durationMs` | number | Wall-clock time in milliseconds | | `durationMs` | number | Wall-clock time in milliseconds |
| `error` | string \| null | Error code + message, if any | | `error` | string \| null | Error code + message, if any |
| `timeout` | number | Configured timeout in ms | | `timeout` | number | Configured timeout in ms |
### Analysis ### Analysis
@@ -229,12 +279,12 @@ rm -rf cw-api-logs/
Mutation endpoints invalidate the relevant cache keys so the next read fetches fresh data from CW: Mutation endpoints invalidate the relevant cache keys so the next read fetches fresh data from CW:
| Mutation | Cache invalidated | | Mutation | Cache invalidated |
|---|---| | ------------------------------ | ---------------------------------------------------------------- |
| Create/update/delete note | `opp:notes:{cwOpportunityId}` via `invalidateNotesCache()` | | Create/update/delete note | `opp:notes:{cwOpportunityId}` via `invalidateNotesCache()` |
| Create/update/delete contact | `opp:contacts:{cwOpportunityId}` via `invalidateContactsCache()` | | Create/update/delete contact | `opp:contacts:{cwOpportunityId}` via `invalidateContactsCache()` |
| Add/update/resequence products | `opp:products:{cwOpportunityId}` via `invalidateProductsCache()` | | Add/update/resequence products | `opp:products:{cwOpportunityId}` via `invalidateProductsCache()` |
| Refresh opportunity | All keys for that opportunity (via re-fetch) | | Refresh opportunity | All keys for that opportunity (via re-fetch) |
--- ---
@@ -242,11 +292,11 @@ Mutation endpoints invalidate the relevant cache keys so the next read fetches f
The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts`: The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts`:
| Setting | Value | Purpose | | Setting | Value | Purpose |
|---|---|---| | --------- | ---------------------------------------------------- | ------------------------------ |
| `baseURL` | `https://ttscw.totaltech.net/v4_6_release/apis/3.0/` | CW API base | | `baseURL` | `https://ttscw.totaltech.net/v4_6_release/apis/3.0/` | CW API base |
| `timeout` | 30,000 ms (30s) | Per-request timeout | | `timeout` | 30,000 ms (30s) | Per-request timeout |
| Logger | `attachCwApiLogger()` | Writes to `cw-api-calls.jsonl` | | Logger | `attachCwApiLogger()` | Writes to `cw-api-calls.jsonl` |
--- ---
@@ -255,7 +305,7 @@ The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts`
``` ```
src/index.ts src/index.ts
├─ setInterval(refreshOpportunityCache, 30s) ├─ setInterval(refreshOpportunityCache, 20m)
└─► src/modules/cache/opportunityCache.ts └─► src/modules/cache/opportunityCache.ts
@@ -283,15 +333,16 @@ src/index.ts
## File reference ## File reference
| File | Purpose | | File | Purpose |
|---|---| | ---------------------------------------------------------------- | ------------------------------------------------------------- |
| `src/modules/cache/opportunityCache.ts` | Cache read/write helpers, background refresh logic | | `src/modules/cache/opportunityCache.ts` | Cache read/write helpers, background refresh logic |
| `src/modules/algorithms/computeCacheTTL.ts` | Primary adaptive TTL algorithm | | `src/modules/algorithms/computeCacheTTL.ts` | Primary adaptive TTL algorithm |
| `src/modules/algorithms/computeSubResourceCacheTTL.ts` | Sub-resource (notes, contacts) TTL algorithm | | `src/modules/algorithms/computeSubResourceCacheTTL.ts` | Sub-resource (notes, contacts) TTL algorithm |
| `src/modules/algorithms/computeProductsCacheTTL.ts` | Products TTL algorithm | | `src/modules/algorithms/computeProductsCacheTTL.ts` | Products TTL algorithm |
| `src/modules/cw-utils/withCwRetry.ts` | Retry wrapper with exponential backoff | | `src/modules/cw-utils/withCwRetry.ts` | Retry wrapper with exponential backoff |
| `src/modules/cw-utils/cwApiLogger.ts` | Axios interceptor for JSONL call logging | | `src/modules/cw-utils/cwApiLogger.ts` | Axios interceptor for JSONL call logging |
| `src/modules/cw-utils/fetchCompany.ts` | Company fetch with retry | | `src/modules/cw-utils/fetchCompany.ts` | Company fetch with retry |
| `src/constants.ts` | CW Axios instance config (timeout, logger) | | `src/modules/cw-utils/procurement/listenInventoryAdjustments.ts` | Adjustment listener for targeted catalog-item cache + DB sync |
| `src/index.ts` | Refresh interval registration | | `src/constants.ts` | CW Axios instance config (timeout, logger) |
| `debug-scripts/analyze-cw-calls.py` | CW API call analysis script | | `src/index.ts` | Refresh interval registration |
| `debug-scripts/analyze-cw-calls.py` | CW API call analysis script |
+8
View File
@@ -124,6 +124,14 @@ Admin-specific UI permissions that control visibility and data loading for admin
| `procurement.catalog.inventory.refresh` | Refresh on-hand inventory for a catalog item from ConnectWise | [src/api/procurement/[id]/refreshInventory.ts](src/api/procurement/[id]/refreshInventory.ts) | `procurement.catalog.fetch` | | `procurement.catalog.inventory.refresh` | Refresh on-hand inventory for a catalog item from ConnectWise | [src/api/procurement/[id]/refreshInventory.ts](src/api/procurement/[id]/refreshInventory.ts) | `procurement.catalog.fetch` |
| `procurement.catalog.link` | Link or unlink catalog items to each other | [src/api/procurement/[id]/link.ts](src/api/procurement/[id]/link.ts), [src/api/procurement/[id]/unlink.ts](src/api/procurement/[id]/unlink.ts) | `procurement.catalog.fetch` | | `procurement.catalog.link` | Link or unlink catalog items to each other | [src/api/procurement/[id]/link.ts](src/api/procurement/[id]/link.ts), [src/api/procurement/[id]/unlink.ts](src/api/procurement/[id]/unlink.ts) | `procurement.catalog.fetch` |
### ConnectWise Callback Routes
`POST /v1/cw/callback/:secret/:resource` is intentionally unauthenticated for inbound ConnectWise callbacks and does **not** require a permission node.
| Permission Node | Description | Used In | Dependencies |
| --------------- | ------------------------------------------------------------------------------- | ------------------------------------------------ | ------------ |
| _None_ | Inbound callback route; secured operationally (network controls / source trust) | [src/api/cw/callback.ts](src/api/cw/callback.ts) | N/A |
### Sales Permissions ### Sales Permissions
Permissions for accessing and managing sales opportunities. Opportunities are synced from ConnectWise and stored locally; sub-resources (products, notes, contacts) are fetched live from CW. Permissions for accessing and managing sales opportunities. Opportunities are synced from ConnectWise and stored locally; sub-resources (products, notes, contacts) are fetched live from CW.
+441
View File
@@ -0,0 +1,441 @@
#!/usr/bin/env python3
import argparse
import json
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
def parse_iso(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str) and value.strip() == "":
continue
return str(value)
return "unknown"
def top_lines(counter: Counter[str], limit: int) -> list[str]:
return [f"{k}: {v}" for k, v in counter.most_common(limit)]
def fmt_pct(part: int, total: int) -> str:
if total == 0:
return "0.0%"
return f"{(part / total) * 100:.1f}%"
def human_duration(start: datetime | None, end: datetime | None) -> str:
if start is None or end is None:
return "unknown"
delta = end - start
total_seconds = int(delta.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours}h {minutes}m {seconds}s"
def truncate(value: str, max_len: int = 90) -> str:
if len(value) <= max_len:
return value
return value[: max_len - 1] + ""
def add_section(lines: list[str], title: str) -> None:
lines.append("")
lines.append(title)
lines.append("-" * len(title))
def supports_color(enabled: bool) -> bool:
return enabled
def paint(text: str, code: str, use_color: bool) -> str:
if not use_color:
return text
return f"\033[{code}m{text}\033[0m"
def good_bad_neutral(value: str, state: str, use_color: bool) -> str:
if state == "good":
return paint(value, "32", use_color)
if state == "bad":
return paint(value, "31", use_color)
return paint(value, "36", use_color)
def add_ranked_counter(
lines: list[str],
title: str,
counter: Counter[str],
top_n: int,
total: int,
truncate_labels: bool = False,
) -> None:
lines.append(f"{title}")
items = counter.most_common(top_n)
if not items:
lines.append(" (no data)")
return
for index, (key, count) in enumerate(items, start=1):
label = truncate(key) if truncate_labels else key
lines.append(f" {index:>2}. {label:<90} {count:>4} {fmt_pct(count, total):>6}")
def stream_row_summary(row: dict[str, Any], use_color: bool, max_path: int) -> str:
request = row.get("request") or {}
response = row.get("response") or {}
body_parsed = request.get("bodyParsed") or {}
entity_parsed = request.get("entityParsed") or {}
summary = request.get("summary") or {}
timestamp = parse_iso(row.get("timestamp"))
time_label = timestamp.astimezone(timezone.utc).strftime("%H:%M:%S") if timestamp else "--:--:--"
method = first_non_empty(request.get("method"))
path = first_non_empty(request.get("path"))
endpoint = path.split("?", 1)[0]
status_code = first_non_empty(response.get("status"))
event_type = first_non_empty(body_parsed.get("Type"), summary.get("type"))
action = first_non_empty(
body_parsed.get("Action"),
summary.get("action"),
request.get("query", {}).get("params", {}).get("action"),
)
item_id = first_non_empty(body_parsed.get("ID"), summary.get("id"), request.get("query", {}).get("inferredId"))
actor = first_non_empty(
request.get("query", {}).get("params", {}).get("memberId"),
summary.get("entityUpdatedBy"),
entity_parsed.get("UpdatedBy"),
)
entity_status = first_non_empty(entity_parsed.get("StatusName"), summary.get("entityStatus"))
endpoint_label = truncate(endpoint, max_path)
status_state = "good" if status_code.startswith("2") else "bad"
status_colored = good_bad_neutral(status_code, status_state, use_color)
event_colored = paint(f"{event_type}.{action}", "36", use_color)
endpoint_colored = paint(endpoint_label, "94", use_color)
return (
f"[{time_label}] {method:<4} {endpoint_colored:<20} "
f"{status_colored:>3} {event_colored:<22} "
f"id={item_id:<7} actor={truncate(actor, 16):<16} status={truncate(entity_status, 22)}"
)
def endpoint_stream_summary(log_path: Path, use_color: bool, max_path: int) -> str:
lines: list[str] = []
lines.append(paint("ENDPOINT STREAM (chronological)", "1;95", use_color))
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
count = 0
invalid = 0
with log_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
invalid += 1
continue
lines.append(stream_row_summary(row, use_color=use_color, max_path=max_path))
count += 1
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
lines.append(
f"events={count} invalid={good_bad_neutral(str(invalid), 'good' if invalid == 0 else 'bad', use_color)}"
)
return "\n".join(lines)
@dataclass
class LogStats:
total_rows: int = 0
parsed_rows: int = 0
invalid_rows: int = 0
earliest: datetime | None = None
latest: datetime | None = None
methods: Counter[str] = None # type: ignore[assignment]
paths: Counter[str] = None # type: ignore[assignment]
endpoint_roots: Counter[str] = None # type: ignore[assignment]
response_statuses: Counter[str] = None # type: ignore[assignment]
event_types: Counter[str] = None # type: ignore[assignment]
actions: Counter[str] = None # type: ignore[assignment]
type_action_combo: Counter[str] = None # type: ignore[assignment]
company_ids: Counter[str] = None # type: ignore[assignment]
source_members: Counter[str] = None # type: ignore[assignment]
actor_members: Counter[str] = None # type: ignore[assignment]
entity_updated_by: Counter[str] = None # type: ignore[assignment]
requests_by_hour: Counter[str] = None # type: ignore[assignment]
requests_by_minute: Counter[str] = None # type: ignore[assignment]
endpoint_by_hour: dict[str, Counter[str]] = None # type: ignore[assignment]
def __post_init__(self) -> None:
self.methods = Counter()
self.paths = Counter()
self.endpoint_roots = Counter()
self.response_statuses = Counter()
self.event_types = Counter()
self.actions = Counter()
self.type_action_combo = Counter()
self.company_ids = Counter()
self.source_members = Counter()
self.actor_members = Counter()
self.entity_updated_by = Counter()
self.requests_by_hour = Counter()
self.requests_by_minute = Counter()
self.endpoint_by_hour = defaultdict(Counter)
def add_timestamp(self, timestamp: datetime | None) -> None:
if timestamp is None:
return
self.earliest = timestamp if self.earliest is None else min(self.earliest, timestamp)
self.latest = timestamp if self.latest is None else max(self.latest, timestamp)
hour_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
minute_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
self.requests_by_hour[hour_bucket] += 1
self.requests_by_minute[minute_bucket] += 1
def summarize(self, top_n: int, busiest_n: int, use_color: bool) -> str:
duration_line = human_duration(self.earliest, self.latest)
time_range_line = "unknown"
if self.earliest and self.latest:
time_range_line = f"{self.earliest.isoformat()}{self.latest.isoformat()}"
total_requests = self.parsed_rows
success_count = self.response_statuses.get("200", 0)
success_pct = fmt_pct(success_count, sum(self.response_statuses.values()))
invalid_state = "good" if self.invalid_rows == 0 else "bad"
top_endpoints = self.endpoint_roots.most_common(2)
top_users = self.actor_members.most_common(3)
top_minutes = self.requests_by_minute.most_common(busiest_n)
lines: list[str] = []
lines.append(paint("WEBHOOK SNAPSHOT", "1;95", use_color))
lines.append(paint("────────────────────────────────────────────────────────", "90", use_color))
lines.append(
" "
+ paint("Rows", "1;97", use_color)
+ f": {self.total_rows:<4} "
+ paint("Parsed", "1;97", use_color)
+ f": {self.parsed_rows:<4} "
+ paint("Invalid", "1;97", use_color)
+ f": {good_bad_neutral(str(self.invalid_rows), invalid_state, use_color)}"
)
lines.append(
" "
+ paint("Window", "1;97", use_color)
+ f": {duration_line:<12} "
+ paint("Success", "1;97", use_color)
+ f": {good_bad_neutral(success_pct, 'good' if success_count else 'neutral', use_color)}"
)
lines.append(" " + paint("UTC Range", "1;97", use_color) + f": {time_range_line}")
lines.append("")
lines.append(paint("Top Endpoints", "1;94", use_color))
if top_endpoints:
for endpoint, count in top_endpoints:
lines.append(f"{endpoint:<14} {count:>4} ({fmt_pct(count, total_requests)})")
if not top_endpoints:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Most Active Users (query memberId)", "1;94", use_color))
if top_users:
for user, count in top_users:
lines.append(f"{user:<18} {count:>4} ({fmt_pct(count, total_requests)})")
if not top_users:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Busiest Minutes", "1;94", use_color))
if top_minutes:
for minute, count in top_minutes:
lines.append(f"{minute:<22} {count:>3}")
if not top_minutes:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Request Mix", "1;94", use_color))
method_line = ", ".join([f"{k}:{v}" for k, v in self.methods.most_common(3)]) or "(no data)"
event_line = ", ".join([f"{k}:{v}" for k, v in self.event_types.most_common(3)]) or "(no data)"
action_line = ", ".join([f"{k}:{v}" for k, v in self.actions.most_common(3)]) or "(no data)"
lines.append(f" • Methods : {method_line}")
lines.append(f" • Types : {event_line}")
lines.append(f" • Actions : {action_line}")
lines.append("")
lines.append(paint("Status Codes", "1;94", use_color))
if self.response_statuses:
status_total = sum(self.response_statuses.values())
for status, count in self.response_statuses.most_common(5):
state = "good" if status.startswith("2") else "bad"
status_label = good_bad_neutral(status, state, use_color)
lines.append(f"{status_label}: {count} ({fmt_pct(count, status_total)})")
if not self.response_statuses:
lines.append(" • (no data)")
return "\n".join(lines)
def update_stats(stats: LogStats, row: dict[str, Any]) -> None:
timestamp = parse_iso(row.get("timestamp"))
stats.add_timestamp(timestamp)
request = row.get("request") or {}
response = row.get("response") or {}
body_parsed = request.get("bodyParsed") or {}
entity_parsed = request.get("entityParsed") or {}
method = first_non_empty(request.get("method"))
path = first_non_empty(request.get("path"))
endpoint_root = path.split("?", 1)[0]
status = first_non_empty(response.get("status"))
event_type = first_non_empty(
body_parsed.get("Type"),
request.get("summary", {}).get("type"),
)
action = first_non_empty(
body_parsed.get("Action"),
request.get("summary", {}).get("action"),
request.get("query", {}).get("params", {}).get("action"),
)
combo = f"{event_type}:{action}"
source_member = first_non_empty(
body_parsed.get("MemberId"),
request.get("summary", {}).get("memberId"),
)
actor_member = first_non_empty(
request.get("query", {}).get("params", {}).get("memberId"),
request.get("summary", {}).get("entityUpdatedBy"),
)
updated_by = first_non_empty(
entity_parsed.get("UpdatedBy"),
request.get("summary", {}).get("entityUpdatedBy"),
)
company_id = first_non_empty(body_parsed.get("CompanyId"), request.get("headers", {}).get("companyname"))
stats.methods[method] += 1
stats.paths[path] += 1
stats.endpoint_roots[endpoint_root] += 1
stats.response_statuses[status] += 1
stats.event_types[event_type] += 1
stats.actions[action] += 1
stats.type_action_combo[combo] += 1
stats.company_ids[company_id] += 1
stats.source_members[source_member] += 1
stats.actor_members[actor_member] += 1
stats.entity_updated_by[updated_by] += 1
if timestamp:
bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
stats.endpoint_by_hour[endpoint_root][bucket] += 1
def analyze_file(log_path: Path) -> LogStats:
stats = LogStats()
with log_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
stats.total_rows += 1
try:
row = json.loads(line)
except json.JSONDecodeError:
stats.invalid_rows += 1
continue
stats.parsed_rows += 1
update_stats(stats, row)
return stats
def main() -> None:
parser = argparse.ArgumentParser(
description="Analyze webhook JSONL logs by users, time, and request types."
)
parser.add_argument("log_file", help="Path to JSONL log file")
parser.add_argument("--top", type=int, default=10, help="Top N entries per section (default: 10)")
parser.add_argument(
"--busiest-minutes",
type=int,
default=5,
help="How many top minute buckets to show (default: 5)",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable ANSI colors",
)
parser.add_argument(
"--endpoint-stream",
action="store_true",
help="Show chronological one-line summary per webhook, similar to live test webserver logs",
)
parser.add_argument(
"--max-path",
type=int,
default=18,
help="Max endpoint width in stream mode before truncation (default: 18)",
)
args = parser.parse_args()
log_path = Path(args.log_file)
if not log_path.exists() or not log_path.is_file():
raise SystemExit(f"Log file not found: {log_path}")
use_color = supports_color(not args.no_color)
if args.endpoint_stream:
print(endpoint_stream_summary(log_path, use_color=use_color, max_path=max(args.max_path, 10)))
return
stats = analyze_file(log_path)
print(
stats.summarize(
top_n=max(args.top, 1),
busiest_n=max(args.busiest_minutes, 1),
use_color=use_color,
)
)
if __name__ == "__main__":
main()
+2
View File
@@ -28,6 +28,8 @@
"utils:gen_private_keys": "bun ./utils/genPrivateKeys", "utils:gen_private_keys": "bun ./utils/genPrivateKeys",
"utils:create_admin_role": "bun ./utils/createAdminRole", "utils:create_admin_role": "bun ./utils/createAdminRole",
"utils:assign_user_role": "bun ./utils/assignUserRole", "utils:assign_user_role": "bun ./utils/assignUserRole",
"utils:test_webserver": "bun ./utils/testWebserver.ts",
"utils:test_adjustments_poll": "bun ./utils/testAdjustmentsPoll.ts",
"utils:analyze_cw": "python3 debug-scripts/analyze-cw-calls.py", "utils:analyze_cw": "python3 debug-scripts/analyze-cw-calls.py",
"db:check": "bunx prisma migrate diff --from-migrations prisma/migrations --to-schema prisma/schema.prisma --shadow-database-url $DATABASE_URL --exit-code" "db:check": "bunx prisma migrate diff --from-migrations prisma/migrations --to-schema prisma/schema.prisma --shadow-database-url $DATABASE_URL --exit-code"
}, },
+164
View File
@@ -0,0 +1,164 @@
import { createRoute } from "../../modules/api-utils/createRoute";
import { apiResponse } from "../../modules/api-utils/apiResponse";
import { ContentfulStatusCode } from "hono/utils/http-status";
import { z } from "zod";
import GenericError from "../../Errors/GenericError";
type ParsedJson = Record<string, unknown> | unknown[];
const callbackResource = z.enum([
"opportunity",
"ticket",
"company",
"activity",
]);
const safeParseJson = (value: string): ParsedJson | null => {
try {
const parsed = JSON.parse(value);
const isObject = typeof parsed === "object" && parsed !== null;
return isObject ? (parsed as ParsedJson) : null;
} catch {
return null;
}
};
const asObject = (value: ParsedJson | null): Record<string, unknown> | null => {
if (!value) return null;
if (Array.isArray(value)) return null;
return value;
};
const parseJsonStringFields = (
value: Record<string, unknown> | null,
): Record<string, unknown> | null => {
if (!value) return null;
return Object.entries(value).reduce<Record<string, unknown>>(
(acc, [key, current]) => {
if (typeof current !== "string") {
acc[key] = current;
return acc;
}
const looksLikeJson = current.startsWith("{") || current.startsWith("[");
if (!looksLikeJson) {
acc[key] = current;
return acc;
}
const parsed = safeParseJson(current);
acc[key] = parsed ?? current;
return acc;
},
{},
);
};
const parseEntity = (value: unknown): ParsedJson | null => {
if (typeof value === "string") return safeParseJson(value);
if (typeof value !== "object" || value === null) return null;
return value as ParsedJson;
};
const buildSummary = (
resource: z.infer<typeof callbackResource>,
parsedBody: Record<string, unknown> | null,
parsedEntity: Record<string, unknown> | null,
) => {
if (!parsedBody) return null;
return {
resource,
messageId: parsedBody.MessageId ?? null,
action: parsedBody.Action ?? null,
type: parsedBody.Type ?? null,
id: parsedBody.ID ?? null,
memberId: parsedBody.MemberId ?? null,
entityStatus:
parsedEntity?.StatusName ??
parsedEntity?.TicketStatus ??
parsedEntity?.Status ??
null,
entitySummary: parsedEntity?.Summary ?? parsedEntity?.CompanyName ?? null,
entityUpdatedBy: parsedEntity?.UpdatedBy ?? null,
entityLastUpdated:
parsedEntity?.LastUpdatedUTC ?? parsedEntity?.LastUpdated ?? null,
};
};
const parseHeaders = (headers: Headers): Record<string, string> =>
Object.fromEntries(headers.entries());
const callbackHeaderSummary = (headers: Record<string, string>) => ({
contentType: headers["content-type"] ?? null,
userAgent: headers["user-agent"] ?? null,
host: headers.host ?? null,
forwardedFor: headers["x-forwarded-for"] ?? null,
callbackId:
headers["x-cw-request-id"] ??
headers["x-request-id"] ??
headers["x-correlation-id"] ??
null,
});
/* /v1/cw/callback/:resource */
export default createRoute("post", ["/callback/:secret/:resource"], async (c) => {
const suppliedSecret = c.req.param("secret");
const expectedSecret = process.env.CW_CALLBACK_SECRET;
if (expectedSecret && suppliedSecret !== expectedSecret) {
throw new GenericError({
name: "Unauthorized",
message: "Invalid callback secret.",
cause: "Path secret mismatch",
status: 401,
});
}
if (!expectedSecret) {
console.warn(
"[cw-callback] CW_CALLBACK_SECRET is not configured; accepting path secret without verification",
);
}
const resource = callbackResource.parse(c.req.param("resource"));
const headers = parseHeaders(c.req.raw.headers);
const headerSummary = callbackHeaderSummary(headers);
const rawBody = await c.req.text();
const parsedJson = safeParseJson(rawBody);
const parsedBody = asObject(parsedJson);
const parsedBodyExpanded = parseJsonStringFields(parsedBody);
const parsedEntity = asObject(parseEntity(parsedBodyExpanded?.Entity));
const summary = buildSummary(resource, parsedBodyExpanded, parsedEntity);
const line = [
`[cw-callback] resource=${resource}`,
`action=${String(summary?.action ?? "-")}`,
`type=${String(summary?.type ?? "-")}`,
`id=${String(summary?.id ?? "-")}`,
`by=${String(summary?.entityUpdatedBy ?? summary?.memberId ?? "-")}`,
`requestId=${String(headerSummary.callbackId ?? "-")}`,
`status=${String(summary?.entityStatus ?? "-")}`,
`summary=${String(summary?.entitySummary ?? "-")}`,
].join(" ");
console.log(line);
const response = apiResponse.successful("CW callback received.", {
resource,
secretValidated: Boolean(expectedSecret),
summary,
headers,
headerSummary,
bodyParsed: parsedBodyExpanded,
receivedAt: new Date().toISOString(),
});
return c.json(response, response.status as ContentfulStatusCode);
});
+3
View File
@@ -0,0 +1,3 @@
import { default as callback } from "./callback";
export { callback };
+7
View File
@@ -0,0 +1,7 @@
import { Hono } from "hono";
import * as cwRoutes from "../cw";
const cwRouter = new Hono();
Object.values(cwRoutes).map((r) => cwRouter.route("/", r));
export default cwRouter;
+169 -25
View File
@@ -4,12 +4,27 @@ import { apiResponse } from "../../../modules/api-utils/apiResponse";
import { ContentfulStatusCode } from "hono/utils/http-status"; import { ContentfulStatusCode } from "hono/utils/http-status";
import { authMiddleware } from "../../middleware/authorization"; import { authMiddleware } from "../../middleware/authorization";
import { processObjectValuePerms } from "../../../modules/permission-utils/processObjectPermissions"; import { processObjectValuePerms } from "../../../modules/permission-utils/processObjectPermissions";
import GenericError from "../../../Errors/GenericError";
import { prisma } from "../../../constants";
import { computeSubResourceCacheTTL } from "../../../modules/algorithms/computeSubResourceCacheTTL";
import { computeProductsCacheTTL } from "../../../modules/algorithms/computeProductsCacheTTL";
import {
getCachedSite,
getCachedNotes,
getCachedContacts,
getCachedProducts,
fetchAndCacheNotes,
fetchAndCacheContacts,
fetchAndCacheProducts,
fetchAndCacheSite,
} from "../../../modules/cache/opportunityCache";
/* GET /v1/sales/opportunities/:identifier?include=notes,contacts,products */ /* GET /v1/sales/opportunities/:identifier?include=notes,contacts,products */
export default createRoute( export default createRoute(
"get", "get",
["/opportunities/:identifier"], ["/opportunities/:identifier"],
async (c) => { async (c) => {
const t0 = performance.now();
const identifier = c.req.param("identifier"); const identifier = c.req.param("identifier");
const includeParam = c.req.query("include") ?? ""; const includeParam = c.req.query("include") ?? "";
const includes = new Set( const includes = new Set(
@@ -19,46 +34,175 @@ export default createRoute(
.filter(Boolean), .filter(Boolean),
); );
const item = await opportunities.fetchItem(identifier); // ── Quick DB lookup (≈3ms) to get cwOpportunityId for pre-warming ──
const isNumeric = /^\d+$/.test(identifier);
const dbRecord = await prisma.opportunity.findFirst({
where: isNumeric
? { cwOpportunityId: Number(identifier) }
: { id: identifier },
select: {
cwOpportunityId: true,
companyCwId: true,
siteCwId: true,
closedFlag: true,
closedDate: true,
expectedCloseDate: true,
cwLastUpdated: true,
statusCwId: true,
},
});
// Eagerly load site data so toJson() includes full site info if (!dbRecord) {
await item.fetchSite(); throw new GenericError({
message: "Opportunity not found",
name: "OpportunityNotFound",
cause: `No opportunity exists with identifier '${identifier}'`,
status: 404,
});
}
// Compute TTLs from DB state
const subTtl = computeSubResourceCacheTTL({
closedFlag: dbRecord.closedFlag,
closedDate: dbRecord.closedDate,
expectedCloseDate: dbRecord.expectedCloseDate,
lastUpdated: dbRecord.cwLastUpdated,
});
const prodTtl = computeProductsCacheTTL({
closedFlag: dbRecord.closedFlag,
closedDate: dbRecord.closedDate,
expectedCloseDate: dbRecord.expectedCloseDate,
lastUpdated: dbRecord.cwLastUpdated,
statusCwId: dbRecord.statusCwId,
});
// ── Pre-warm sub-resources only on cache miss ───────────────────────
// Check Redis first — if the background refresh has kept the keys warm,
// skip the CW calls entirely. Only fetch-and-cache on a miss.
const cwOppId = dbRecord.cwOpportunityId;
const _pw0 = performance.now();
const _wrapPw = (label: string, p: Promise<any>) =>
p
.then((r) => {
console.log(
`[PERF:prewarm] ${label}: ${(performance.now() - _pw0).toFixed(0)}ms`,
);
return r;
})
.catch(() => {});
const prewarmPromises: Promise<any>[] = [];
if (dbRecord.companyCwId && dbRecord.siteCwId) {
const compId = dbRecord.companyCwId,
siteId = dbRecord.siteCwId;
prewarmPromises.push(
_wrapPw(
"site",
getCachedSite(compId, siteId).then(
(c) => c ?? fetchAndCacheSite(compId, siteId),
),
),
);
}
if (includes.has("notes") && subTtl)
prewarmPromises.push(
_wrapPw(
"notes",
getCachedNotes(cwOppId).then(
(c) => c ?? fetchAndCacheNotes(cwOppId, subTtl),
),
),
);
if (includes.has("contacts") && subTtl)
prewarmPromises.push(
_wrapPw(
"contacts",
getCachedContacts(cwOppId).then(
(c) => c ?? fetchAndCacheContacts(cwOppId, subTtl),
),
),
);
if (includes.has("products") && prodTtl)
prewarmPromises.push(
_wrapPw(
"products",
getCachedProducts(cwOppId).then(
(c) => c ?? fetchAndCacheProducts(cwOppId, prodTtl),
),
),
);
// fetchItem runs its own CW calls (opp, activities, company) —
// these execute concurrently with the sub-resource pre-warming above.
const [item] = await Promise.all([
opportunities.fetchItem(identifier),
...prewarmPromises,
]);
const t1 = performance.now();
console.log(`[PERF] fetchItem + prewarm: ${(t1 - t0).toFixed(0)}ms`);
// Sub-resources now hit warm Redis cache (near-instant)
const _st = performance.now();
const _wrapTimed = (label: string, p: Promise<any>) =>
p.then((r) => {
console.log(
`[PERF:sub] ${label}: ${(performance.now() - _st).toFixed(0)}ms`,
);
return r;
});
const subResourcePromises: Record<string, Promise<any>> = {
_site: _wrapTimed("site", item.fetchSite()),
};
if (includes.has("notes")) {
subResourcePromises.notes = _wrapTimed("notes", item.fetchNotes());
}
if (includes.has("contacts")) {
subResourcePromises.contacts = _wrapTimed(
"contacts",
item.fetchContacts(),
);
}
if (includes.has("products")) {
subResourcePromises.products = _wrapTimed(
"products",
item
.fetchProducts()
.then((products) => products.map((p) => p.toJson())),
);
}
const keys = Object.keys(subResourcePromises);
const results = await Promise.all(keys.map((k) => subResourcePromises[k]));
const t2 = performance.now();
console.log(
`[PERF] sub-resources (${keys.join(",")}): ${(t2 - t1).toFixed(0)}ms`,
);
// Apply toJson after site is hydrated (side-effect from fetchSite)
const gatedData = await processObjectValuePerms( const gatedData = await processObjectValuePerms(
item.toJson(), item.toJson(),
"obj.opportunity", "obj.opportunity",
c.get("user"), c.get("user"),
); );
const t3 = performance.now();
console.log(`[PERF] processObjectValuePerms: ${(t3 - t2).toFixed(0)}ms`);
// Fetch requested sub-resources in parallel // Attach sub-resources (skip the internal _site key)
const subResourcePromises: Record<string, Promise<any>> = {}; keys.forEach((k, i) => {
if (includes.has("notes")) { if (k !== "_site") {
subResourcePromises.notes = item.fetchNotes();
}
if (includes.has("contacts")) {
subResourcePromises.contacts = item.fetchContacts();
}
if (includes.has("products")) {
subResourcePromises.products = item
.fetchProducts()
.then((products) => products.map((p) => p.toJson()));
}
const keys = Object.keys(subResourcePromises);
if (keys.length > 0) {
const results = await Promise.all(
keys.map((k) => subResourcePromises[k]),
);
keys.forEach((k, i) => {
(gatedData as any)[k] = results[i]; (gatedData as any)[k] = results[i];
}); }
} });
const response = apiResponse.successful( const response = apiResponse.successful(
"Opportunity fetched successfully!", "Opportunity fetched successfully!",
gatedData, gatedData,
); );
console.log(
`[PERF] total handler: ${(performance.now() - t0).toFixed(0)}ms (includes=${includeParam || "none"})`,
);
return c.json(response, response.status as ContentfulStatusCode); return c.json(response, response.status as ContentfulStatusCode);
}, },
authMiddleware({ permissions: ["sales.opportunity.fetch"] }), authMiddleware({ permissions: ["sales.opportunity.fetch"] }),
+1
View File
@@ -57,6 +57,7 @@ v1.route("/permissions", require("./routers/permissionRouter").default);
v1.route("/unifi", require("./routers/unifiRouter").default); v1.route("/unifi", require("./routers/unifiRouter").default);
v1.route("/procurement", require("./routers/procurementRouter").default); v1.route("/procurement", require("./routers/procurementRouter").default);
v1.route("/sales", require("./routers/salesRouter").default); v1.route("/sales", require("./routers/salesRouter").default);
v1.route("/cw", require("./routers/cwRouter").default);
app.route("/v1", v1); app.route("/v1", v1);
export default app; export default app;
+9 -6
View File
@@ -50,18 +50,21 @@ export class CompanyController {
const cwCompany = await fetchCwCompanyById(this.cw_CompanyId); const cwCompany = await fetchCwCompanyById(this.cw_CompanyId);
if (!cwCompany) return this; if (!cwCompany) return this;
const contactHref = cwCompany.defaultContact?._info?.contact_href;
const defaultContactData = contactHref
? await connectWiseApi.get(contactHref)
: undefined;
const allContactsData = await connectWiseApi.get( const allContactsData = await connectWiseApi.get(
`${cwCompany._info.contacts_href}&pageSize=1000`, `${cwCompany._info.contacts_href}&pageSize=1000`,
); );
// Derive default contact from allContacts instead of a separate CW call
const defaultContactId = cwCompany.defaultContact?.id;
const defaultContactData = defaultContactId
? ((allContactsData.data as any[]).find(
(c: any) => c.id === defaultContactId,
) ?? null)
: null;
this.cw_Data = { this.cw_Data = {
company: cwCompany, company: cwCompany,
defaultContact: defaultContactData?.data ?? null, defaultContact: defaultContactData,
allContacts: allContactsData.data, allContacts: allContactsData.data,
}; };
+23 -13
View File
@@ -15,7 +15,10 @@ import {
CWOpportunity, CWOpportunity,
CWOpportunityNote, CWOpportunityNote,
} from "../modules/cw-utils/opportunities/opportunity.types"; } from "../modules/cw-utils/opportunities/opportunity.types";
import { resolveMember } from "../modules/cw-utils/members/memberCache"; import {
resolveMember,
resolveMembers,
} from "../modules/cw-utils/members/memberCache";
import { ForecastProductController } from "./ForecastProductController"; import { ForecastProductController } from "./ForecastProductController";
import GenericError from "../Errors/GenericError"; import GenericError from "../Errors/GenericError";
import { computeSubResourceCacheTTL } from "../modules/algorithms/computeSubResourceCacheTTL"; import { computeSubResourceCacheTTL } from "../modules/algorithms/computeSubResourceCacheTTL";
@@ -429,18 +432,25 @@ export class OpportunityController {
/** Serialize raw CW note data into the API response shape. */ /** Serialize raw CW note data into the API response shape. */
private async _serializeNotes(notes: any[]) { private async _serializeNotes(notes: any[]) {
return Promise.all( // Batch-resolve all member identifiers in a single DB query
notes.map(async (n: any) => ({ const identifiers = notes
id: n.id, .map((n: any) => n.enteredBy as string)
text: n.text, .filter(Boolean);
type: n.type ? { id: n.type.id, name: n.type.name } : null, const memberMap = await resolveMembers(identifiers);
flagged: n.flagged,
dateEntered: n._info?.lastUpdated return notes.map((n: any) => ({
? new Date(n._info.lastUpdated) id: n.id,
: null, text: n.text,
enteredBy: await resolveMember(n.enteredBy), type: n.type ? { id: n.type.id, name: n.type.name } : null,
})), flagged: n.flagged,
); dateEntered: n._info?.lastUpdated ? new Date(n._info.lastUpdated) : null,
enteredBy: memberMap.get(n.enteredBy) ?? {
id: null,
identifier: n.enteredBy,
name: n.enteredBy,
cwMemberId: null,
},
}));
} }
/** /**
+24 -11
View File
@@ -12,6 +12,7 @@ import { unifiSites } from "./managers/unifiSites";
import { refreshCompanies } from "./modules/cw-utils/refreshCompanies"; import { refreshCompanies } from "./modules/cw-utils/refreshCompanies";
import { refreshCatalog } from "./modules/cw-utils/procurement/refreshCatalog"; import { refreshCatalog } from "./modules/cw-utils/procurement/refreshCatalog";
import { refreshInventory } from "./modules/cw-utils/procurement/refreshInventory"; import { refreshInventory } from "./modules/cw-utils/procurement/refreshInventory";
import { listenInventoryAdjustments } from "./modules/cw-utils/procurement/listenInventoryAdjustments";
import { refreshOpportunities } from "./modules/cw-utils/opportunities/refreshOpportunities"; import { refreshOpportunities } from "./modules/cw-utils/opportunities/refreshOpportunities";
import { refreshOpportunityCache } from "./modules/cache/opportunityCache"; import { refreshOpportunityCache } from "./modules/cache/opportunityCache";
import { refreshCwIdentifiers } from "./modules/cw-utils/members/refreshCwIdentifiers"; import { refreshCwIdentifiers } from "./modules/cw-utils/members/refreshCwIdentifiers";
@@ -105,25 +106,37 @@ setInterval(() => {
); );
}, 60 * 1000); }, 60 * 1000);
// Refresh the internal catalog every minute // Refresh the internal catalog every 30 minutes
await safeStartup("refreshCatalog", refreshCatalog); await safeStartup("refreshCatalog", refreshCatalog);
setInterval(() => { setInterval(
return refreshCatalog().catch((err) => () => {
console.error(`[interval] refreshCatalog failed: ${briefErr(err)}`), return refreshCatalog().catch((err) =>
); console.error(`[interval] refreshCatalog failed: ${briefErr(err)}`),
}, 60 * 1000); );
},
30 * 60 * 1000,
);
// Refresh inventory on hand every 2 minutes // Fallback full inventory sweep every 6 hours (listener handles real-time deltas)
await safeStartup("refreshInventory", refreshInventory);
setInterval( setInterval(
() => { () => {
return refreshInventory().catch((err) => return refreshInventory().catch((err) =>
console.error(`[interval] refreshInventory failed: ${briefErr(err)}`), console.error(`[interval] refreshInventory failed: ${briefErr(err)}`),
); );
}, },
2 * 60 * 1000, 6 * 60 * 60 * 1000,
); );
// Listen for procurement adjustment changes and sync changed products to DB + cache
await safeStartup("listenInventoryAdjustments", listenInventoryAdjustments);
setInterval(() => {
return listenInventoryAdjustments().catch((err) =>
console.error(
`[interval] listenInventoryAdjustments failed: ${briefErr(err)}`,
),
);
}, 60 * 1000);
// Refresh opportunities every minute // Refresh opportunities every minute
await safeStartup("refreshOpportunities", refreshOpportunities); await safeStartup("refreshOpportunities", refreshOpportunities);
setInterval(() => { setInterval(() => {
@@ -132,7 +145,7 @@ setInterval(() => {
); );
}, 60 * 1000); }, 60 * 1000);
// Refresh opportunity CW cache every 30 seconds (activities + company hydration) // Refresh opportunity CW cache every 20 minutes (activities + company hydration)
// NOTE: Do NOT await — register the interval immediately so the cache refresh // NOTE: Do NOT await — register the interval immediately so the cache refresh
// is never blocked by a slow/stuck startup task above. // is never blocked by a slow/stuck startup task above.
safeStartup("refreshOpportunityCache", refreshOpportunityCache); safeStartup("refreshOpportunityCache", refreshOpportunityCache);
@@ -142,7 +155,7 @@ setInterval(() => {
`[interval] refreshOpportunityCache failed: ${briefErr(err)}`, `[interval] refreshOpportunityCache failed: ${briefErr(err)}`,
); );
}); });
}, 30 * 1000); }, 20 * 60 * 1000);
// Refresh User Defined Fields every 5 minutes // Refresh User Defined Fields every 5 minutes
await safeStartup("refreshUDFs", () => userDefinedFieldsCw.refresh()); await safeStartup("refreshUDFs", () => userDefinedFieldsCw.refresh());
+73 -34
View File
@@ -42,16 +42,22 @@ async function buildCompanyController(
ttlMs?: number; ttlMs?: number;
}, },
): Promise<CompanyController> { ): Promise<CompanyController> {
const _ct0 = performance.now();
const strategy = opts?.strategy ?? "cache-then-cw"; const strategy = opts?.strategy ?? "cache-then-cw";
const ctrl = new CompanyController(company); const ctrl = new CompanyController(company);
// ── cw-first: always fetch from CW ────────────────────────────────── // ── cw-first: always fetch from CW (and cache the result) ──────────
if (strategy === "cw-first") { if (strategy === "cw-first") {
await ctrl.hydrateCwData(); const blob = opts?.ttlMs
if (ctrl.cw_Data && opts?.ttlMs) { ? await fetchAndCacheCompanyCwData(
await fetchAndCacheCompanyCwData(company.cw_CompanyId, opts.ttlMs).catch( company.cw_CompanyId,
() => {}, opts.ttlMs,
); ).catch(() => null)
: null;
if (blob) {
ctrl.cw_Data = blob;
} else {
await ctrl.hydrateCwData();
} }
return ctrl; return ctrl;
} }
@@ -66,14 +72,20 @@ async function buildCompanyController(
// cache-only stops here — return the bare DB-backed controller // cache-only stops here — return the bare DB-backed controller
if (strategy === "cache-only") return ctrl; if (strategy === "cache-only") return ctrl;
// cache-then-cw: cache miss — fall through to CW // cache-then-cw: cache miss — fetch from CW once and cache in one pass
await ctrl.hydrateCwData(); if (opts?.ttlMs) {
if (ctrl.cw_Data && opts?.ttlMs) { const blob = await fetchAndCacheCompanyCwData(
await fetchAndCacheCompanyCwData(company.cw_CompanyId, opts.ttlMs).catch( company.cw_CompanyId,
() => {}, opts.ttlMs,
); ).catch(() => null);
if (blob) ctrl.cw_Data = blob;
} else {
await ctrl.hydrateCwData();
} }
console.log(
`[PERF:buildCompany] ${(performance.now() - _ct0).toFixed(0)}ms (strategy=${strategy}, hit=miss)`,
);
return ctrl; return ctrl;
} }
@@ -93,17 +105,14 @@ async function buildActivities(
ttlMs?: number; ttlMs?: number;
}, },
): Promise<ActivityController[]> { ): Promise<ActivityController[]> {
const _at0 = performance.now();
const strategy = opts?.strategy ?? "cache-then-cw"; const strategy = opts?.strategy ?? "cache-then-cw";
// ── cw-first: always fetch from CW ────────────────────────────────── // ── cw-first: always fetch from CW (and cache the result) ──────────
if (strategy === "cw-first") { if (strategy === "cw-first") {
const collection = await activityCw.fetchByOpportunity(cwOpportunityId); const arr = opts?.ttlMs
const arr = collection.map((item) => item); ? await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs)
if (opts?.ttlMs) { : await activityCw.fetchByOpportunityDirect(cwOpportunityId);
await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs).catch(
() => {},
);
}
return arr.map((item) => new ActivityController(item)); return arr.map((item) => new ActivityController(item));
} }
@@ -116,12 +125,13 @@ async function buildActivities(
// cache-only stops here — return empty (background job will fill it) // cache-only stops here — return empty (background job will fill it)
if (strategy === "cache-only") return []; if (strategy === "cache-only") return [];
// cache-then-cw: cache miss — fall through to CW // cache-then-cw: cache miss — fetch once and cache in one pass
const collection = await activityCw.fetchByOpportunity(cwOpportunityId); const arr = opts?.ttlMs
const arr = collection.map((item) => item); ? await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs)
if (opts?.ttlMs) { : await activityCw.fetchByOpportunityDirect(cwOpportunityId);
await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs).catch(() => {}); console.log(
} `[PERF:buildActivities] ${(performance.now() - _at0).toFixed(0)}ms (strategy=${strategy}, hit=miss, count=${arr.length})`,
);
return arr.map((item) => new ActivityController(item)); return arr.map((item) => new ActivityController(item));
} }
@@ -192,6 +202,7 @@ export const opportunities = {
identifier: string | number, identifier: string | number,
opts?: { fresh?: boolean }, opts?: { fresh?: boolean },
): Promise<OpportunityController> { ): Promise<OpportunityController> {
const _t0 = performance.now();
const strategy: "cache-only" | "cache-then-cw" | "cw-first" = opts?.fresh const strategy: "cache-only" | "cache-then-cw" | "cw-first" = opts?.fresh
? "cw-first" ? "cw-first"
: "cache-then-cw"; : "cache-then-cw";
@@ -205,6 +216,8 @@ export const opportunities = {
: { id: identifier as string }, : { id: identifier as string },
include: { company: true }, include: { company: true },
}); });
const _t1 = performance.now();
console.log(`[PERF:fetchItem] DB lookup: ${(_t1 - _t0).toFixed(0)}ms`);
if (!existing) { if (!existing) {
throw new GenericError({ throw new GenericError({
@@ -232,12 +245,26 @@ export const opportunities = {
// Try the Redis cache first // Try the Redis cache first
cwData = await getCachedOppCwData(existing.cwOpportunityId); cwData = await getCachedOppCwData(existing.cwOpportunityId);
} }
const _t2 = performance.now();
console.log(
`[PERF:fetchItem] Redis cache check: ${(_t2 - _t1).toFixed(0)}ms (hit=${!!cwData})`,
);
// ── Parallel block: CW opp fetch + activities + company ────────────
// Activities and company hydration only need existing.cwOpportunityId
// and existing.company — both available from the initial DB lookup —
// so they can run concurrently with the CW opp fetch + DB update.
const cwOppPromise = (async () => {
if (cwData) return; // cache hit — nothing to do
if (!cwData) {
// Cache miss or forced fresh — fetch from CW and cache
cwData = ttlMs cwData = ttlMs
? await fetchAndCacheOppCwData(existing.cwOpportunityId, ttlMs) ? await fetchAndCacheOppCwData(existing.cwOpportunityId, ttlMs)
: await opportunityCw.fetch(existing.cwOpportunityId); : await opportunityCw.fetch(existing.cwOpportunityId);
const _t2b = performance.now();
console.log(
`[PERF:fetchItem] CW opp fetch: ${(_t2b - _t2).toFixed(0)}ms`,
);
if (!cwData) { if (!cwData) {
throw new GenericError({ throw new GenericError({
@@ -264,15 +291,27 @@ export const opportunities = {
data: { ...mapped, companyId }, data: { ...mapped, companyId },
include: { company: true }, include: { company: true },
}); });
} console.log(
`[PERF:fetchItem] DB update: ${(performance.now() - _t2b).toFixed(0)}ms`,
);
})();
// Hydrate activities and company in parallel const _t3 = performance.now();
const [activities, company] = await Promise.all([ // Hydrate activities and company in parallel with CW opp fetch
buildActivities(record.cwOpportunityId, { strategy, ttlMs }), const [, activities, company] = await Promise.all([
record.company cwOppPromise,
? buildCompanyController(record.company, { strategy, ttlMs }) buildActivities(existing.cwOpportunityId, { strategy, ttlMs }),
existing.company
? buildCompanyController(existing.company, { strategy, ttlMs })
: Promise.resolve(undefined), : Promise.resolve(undefined),
]); ]);
const _t4 = performance.now();
console.log(
`[PERF:fetchItem] parallel block (cw+activities+company): ${(_t4 - _t3).toFixed(0)}ms`,
);
console.log(
`[PERF:fetchItem] TOTAL: ${(_t4 - _t0).toFixed(0)}ms (strategy=${strategy}, ttl=${ttlMs}ms)`,
);
return new OpportunityController(record, { return new OpportunityController(record, {
company, company,
+8 -6
View File
@@ -16,8 +16,8 @@
* | # | Condition | TTL (ms) | TTL (human) | Rationale | * | # | Condition | TTL (ms) | TTL (human) | Rationale |
* |---|------------------------------------------------------------------|----------|-------------|--------------------------------------------------------------------| * |---|------------------------------------------------------------------|----------|-------------|--------------------------------------------------------------------|
* | 1 | `closedFlag` is `true` | `null` | Do not cache| Closed records are rarely accessed; caching wastes memory. | * | 1 | `closedFlag` is `true` | `null` | Do not cache| Closed records are rarely accessed; caching wastes memory. |
* | 2 | `expectedCloseDate` OR `lastUpdated` is within the last **5 days**| 30 000 | 30 seconds | High-activity window — data changes frequently and must stay fresh.| * | 2 | `expectedCloseDate` OR `lastUpdated` is within the last **5 days**| 60 000 | 60 seconds | High-activity window — data changes frequently and must stay fresh.|
* | 3 | `expectedCloseDate` OR `lastUpdated` is within the last **14 days**| 60 000 | 60 seconds | Moderate activity — still relevant, but changes less often. | * | 3 | `expectedCloseDate` OR `lastUpdated` is within the last **14 days**| 90 000 | 90 seconds | Moderate activity — still relevant, but changes less often. |
* | 4 | Everything else (older than 14 days) | 900 000 | 15 minutes | Low activity — safe to serve from cache for longer. | * | 4 | Everything else (older than 14 days) | 900 000 | 15 minutes | Low activity — safe to serve from cache for longer. |
* *
* ## Evaluation order * ## Evaluation order
@@ -62,11 +62,13 @@
// Constants // Constants
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/** 30 seconds TTL for high-activity records (within 5 days). */ /** 60 seconds TTL for high-activity records (within 5 days).
export const TTL_HIGH_ACTIVITY = 30_000; * Must exceed the 30-second background refresh interval so the cache
* stays warm between cycles. */
export const TTL_HIGH_ACTIVITY = 60_000;
/** 60 seconds TTL for moderate-activity records (within 14 days). */ /** 90 seconds TTL for moderate-activity records (within 14 days). */
export const TTL_MODERATE_ACTIVITY = 60_000; export const TTL_MODERATE_ACTIVITY = 90_000;
/** 15 minutes TTL for low-activity / stale records. */ /** 15 minutes TTL for low-activity / stale records. */
export const TTL_LOW_ACTIVITY = 900_000; export const TTL_LOW_ACTIVITY = 900_000;
@@ -18,7 +18,7 @@
* | 1 | Status is **Won**, **Lost**, **Pending Won**, or **Pending Lost** | `null` | No cache | Products on terminal / near-terminal opps are static; no need to keep them warm. | * | 1 | Status is **Won**, **Lost**, **Pending Won**, or **Pending Lost** | `null` | No cache | Products on terminal / near-terminal opps are static; no need to keep them warm. |
* | 2 | Opportunity is **not cacheable** (main cache TTL is `null`) | `null` | No cache | If the opp itself is evicted, sub-resources follow suit. | * | 2 | Opportunity is **not cacheable** (main cache TTL is `null`) | `null` | No cache | If the opp itself is evicted, sub-resources follow suit. |
* | 3 | `lastUpdated` is within the last **3 days** | 15 000 | 15 seconds | Actively-worked deals — products are being edited and need near-real-time freshness. | * | 3 | `lastUpdated` is within the last **3 days** | 15 000 | 15 seconds | Actively-worked deals — products are being edited and need near-real-time freshness. |
* | 4 | Everything else | 1 800 000 | 30 minutes | Lazy on-demand cache: fetched when requested, expires after 30 min without refresh. | * | 4 | Everything else | 1 200 000 | 20 minutes | Lazy on-demand cache: fetched when requested, expires after 20 min without refresh. |
* *
* ## Evaluation order * ## Evaluation order
* *
@@ -44,11 +44,13 @@ import { QUOTE_STATUSES } from "../../types/QuoteStatuses";
// Constants // Constants
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/** 15 seconds — TTL for hot products (opportunity updated within 3 days). */ /** 45 seconds — TTL for hot products (opportunity updated within 3 days).
export const PRODUCTS_TTL_HOT = 15_000; * Must exceed the 30-second background refresh interval so the cache
* stays warm between cycles. */
export const PRODUCTS_TTL_HOT = 45_000;
/** 30 minutes — TTL for on-demand product cache (lazy fallback). */ /** 20 minutes — TTL for on-demand product cache (lazy fallback). */
export const PRODUCTS_TTL_LAZY = 1_800_000; export const PRODUCTS_TTL_LAZY = 1_200_000;
/** 3 days in milliseconds. */ /** 3 days in milliseconds. */
const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000; const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000;
+25 -16
View File
@@ -310,25 +310,34 @@ export async function fetchAndCacheCompanyCwData(
ttlMs: number, ttlMs: number,
): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> { ): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> {
try { try {
const cwCompany = await fetchCwCompanyById(cwCompanyId); // Fetch company and all-contacts in parallel — the allContacts URL
// can be constructed directly without the company response.
const [cwCompany, allContactsData] = await Promise.all([
fetchCwCompanyById(cwCompanyId),
withCwRetry(
() =>
connectWiseApi.get(
`/company/companies/${cwCompanyId}/contacts?pageSize=1000`,
),
{ label: `company#${cwCompanyId}/allContacts` },
),
]);
if (!cwCompany) return null; if (!cwCompany) return null;
const contactHref = cwCompany.defaultContact?._info?.contact_href; // Default contact: derive from allContacts instead of making an
const defaultContactData = contactHref // extra serial CW call. The company object carries the default
? await withCwRetry(() => connectWiseApi.get(contactHref), { // contact's ID, so we can pull it from the list we already fetched.
label: `company#${cwCompanyId}/defaultContact`, const defaultContactId = cwCompany.defaultContact?.id;
}) const defaultContactData = defaultContactId
: undefined; ? ((allContactsData.data as any[]).find(
(c: any) => c.id === defaultContactId,
const allContactsData = await withCwRetry( ) ?? null)
() => : null;
connectWiseApi.get(`${cwCompany._info.contacts_href}&pageSize=1000`),
{ label: `company#${cwCompanyId}/allContacts` },
);
const blob = { const blob = {
company: cwCompany, company: cwCompany,
defaultContact: defaultContactData?.data ?? null, defaultContact: defaultContactData,
allContacts: allContactsData.data, allContacts: allContactsData.data,
}; };
@@ -491,11 +500,11 @@ export async function invalidateProductsCache(
} }
/** /**
* Site TTL 30 minutes. Site/address data rarely changes so we cache * Site TTL 20 minutes. Site/address data rarely changes so we cache
* aggressively. The background refresh does NOT proactively warm site keys; * aggressively. The background refresh does NOT proactively warm site keys;
* they are populated lazily on the first detail-view request. * they are populated lazily on the first detail-view request.
*/ */
const SITE_TTL_MS = 1_800_000; const SITE_TTL_MS = 1_200_000;
/** /**
* Fetch a CW company site from ConnectWise and cache the result. * Fetch a CW company site from ConnectWise and cache the result.
@@ -0,0 +1,79 @@
/**
* CW API Concurrency Limiter
*
* Limits the number of simultaneous in-flight requests to the ConnectWise
* API. CW responds significantly slower under high concurrency (observed
* ~3× slower at 9 concurrent vs 56 concurrent), so bounding the
* parallelism actually reduces total wall-clock time.
*
* Implemented as an Axios request interceptor that gates on a simple
* counting semaphore. When the limit is reached, new requests queue and
* resolve in FIFO order as earlier requests complete.
*/
import type { AxiosInstance, InternalAxiosRequestConfig } from "axios";
// ---------------------------------------------------------------------------
// Semaphore
// ---------------------------------------------------------------------------
class Semaphore {
private _current = 0;
private _queue: (() => void)[] = [];
constructor(private _max: number) {}
/** Acquire a slot — resolves immediately if under the limit, else waits. */
acquire(): Promise<void> {
if (this._current < this._max) {
this._current++;
return Promise.resolve();
}
return new Promise<void>((resolve) => {
this._queue.push(resolve);
});
}
/** Release a slot — wakes the next queued caller, if any. */
release(): void {
const next = this._queue.shift();
if (next) {
// Hand the slot directly to the next waiter (don't decrement)
next();
} else {
this._current--;
}
}
}
// ---------------------------------------------------------------------------
// Interceptor attachment
// ---------------------------------------------------------------------------
/**
* Attach a concurrency-limiting interceptor to an Axios instance.
*
* @param api - The Axios instance to limit.
* @param max - Maximum concurrent in-flight requests (default: 6).
*/
export function attachCwConcurrencyLimiter(api: AxiosInstance, max = 6): void {
const sem = new Semaphore(max);
// Request interceptor: wait for a slot before the request fires
api.interceptors.request.use(async (config: InternalAxiosRequestConfig) => {
await sem.acquire();
return config;
});
// Response interceptor: release the slot on success or failure
api.interceptors.response.use(
(response) => {
sem.release();
return response;
},
(error) => {
sem.release();
return Promise.reject(error);
},
);
}
@@ -102,3 +102,40 @@ export const resolveMember = async (
cwMemberId: cwMember?.id ?? null, cwMemberId: cwMember?.id ?? null,
}; };
}; };
/**
* Resolve Multiple CW Identifiers in a Single Batch
*
* Same as `resolveMember` but batches the DB query so that N identifiers
* require only **one** `findMany` instead of N `findFirst` calls.
*
* @param identifiers - Array of CW member identifiers
* @returns Map of identifier ResolvedMember
*/
export const resolveMembers = async (
identifiers: string[],
): Promise<Map<string, ResolvedMember>> => {
const unique = [...new Set(identifiers)];
// Single batched DB query for all identifiers
const localUsers = await prisma.user.findMany({
where: { cwIdentifier: { in: unique } },
select: { id: true, cwIdentifier: true },
});
const userMap = new Map(localUsers.map((u) => [u.cwIdentifier, u.id]));
const result = new Map<string, ResolvedMember>();
for (const identifier of unique) {
const cwMember = memberCache.get(identifier);
const name = cwMember
? `${cwMember.firstName} ${cwMember.lastName}`.trim() || identifier
: identifier;
result.set(identifier, {
id: userMap.get(identifier) ?? null,
identifier,
name,
cwMemberId: cwMember?.id ?? null,
});
}
return result;
};
+22 -4
View File
@@ -66,10 +66,28 @@ export const catalogCw = {
return allItems; return allItems;
}, },
fetchByCatalogId: async (cwCatalogId: number): Promise<CatalogItem> => {
try {
const response = await connectWiseApi.get(
`/procurement/catalog/${cwCatalogId}`,
);
return response.data;
} catch {
const fallback = await connectWiseApi.get(
`/procurement/catalog/items/${cwCatalogId}`,
);
return fallback.data;
}
},
fetch: async (id: string): Promise<CatalogItem> => { fetch: async (id: string): Promise<CatalogItem> => {
const response = await connectWiseApi.get( const numericId = Number(id);
`/procurement/catalog/items/${id}`, if (!Number.isFinite(numericId)) {
); const response = await connectWiseApi.get(
return response.data; `/procurement/catalog/items/${id}`,
);
return response.data;
}
return catalogCw.fetchByCatalogId(numericId);
}, },
}; };
@@ -0,0 +1,469 @@
import { prisma, redis, connectWiseApi } from "../../../constants";
import { withCwRetry } from "../withCwRetry";
import { catalogCw } from "./catalog";
import { CatalogItem } from "./catalog.types";
type JsonObject = Record<string, unknown>;
type TrackedProduct = {
cwCatalogId: number;
product: string;
onHand: string;
inventory: string;
key: string;
};
type AdjustmentSnapshot = {
key: string;
trackedRows: TrackedProduct[];
signature: string;
};
const ADJUSTMENTS_ENDPOINT = "/procurement/adjustments?pageSize=1000";
const CATALOG_ITEM_CACHE_PREFIX = "catalog:item:cw:";
const CATALOG_ITEM_CACHE_TTL_SECONDS = 20 * 60;
const MAX_SYNC_PER_CYCLE = Number(
process.env.CW_ADJUSTMENT_SYNC_MAX_PER_CYCLE ?? "50",
);
const SYNC_COOLDOWN_MS = Number(
process.env.CW_ADJUSTMENT_SYNC_COOLDOWN_MS ?? `${10 * 60 * 1000}`,
);
let previous = new Map<string, AdjustmentSnapshot>();
let previousProductState = new Map<number, string>();
const lastSyncedAt = new Map<number, number>();
let inFlight = false;
const isObject = (value: unknown): value is JsonObject =>
typeof value === "object" && value !== null && !Array.isArray(value);
const toObject = (value: unknown): JsonObject => {
if (!isObject(value)) return {};
return value;
};
const stableStringify = (value: unknown): string => {
if (Array.isArray(value)) {
const entries = value.map((entry) => stableStringify(entry)).sort();
return `[${entries.join(",")}]`;
}
if (isObject(value)) {
const keys = Object.keys(value).sort();
const pairs = keys.map(
(key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`,
);
return `{${pairs.join(",")}}`;
}
return JSON.stringify(value);
};
const readPathValue = (obj: JsonObject, path: string): unknown => {
const parts = path.split(".");
let current: unknown = obj;
for (const part of parts) {
if (!isObject(current)) return null;
current = current[part];
}
return current;
};
const firstValue = (obj: JsonObject, paths: string[]): unknown => {
for (const path of paths) {
const value = readPathValue(obj, path);
if (value === null || value === undefined || value === "") continue;
return value;
}
return null;
};
const asNumber = (value: unknown): number | null => {
if (typeof value === "number" && Number.isFinite(value)) return value;
if (typeof value === "string" && value.length > 0) {
const parsed = Number(value);
if (Number.isFinite(parsed)) return parsed;
}
return null;
};
const asText = (value: unknown): string => {
if (value === null || value === undefined || value === "") return "-";
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
return String(value);
}
if (Array.isArray(value)) {
return `[${value.map((entry) => asText(entry)).join(",")}]`;
}
if (!isObject(value)) return String(value);
const preferredFields = ["name", "identifier", "id", "code", "value"];
for (const field of preferredFields) {
const fieldValue = readPathValue(value, field);
if (fieldValue === null || fieldValue === undefined || fieldValue === "")
continue;
if (typeof fieldValue === "object") continue;
return String(fieldValue);
}
return stableStringify(value);
};
const adjustmentKey = (adjustment: JsonObject): string => {
const keyPaths = [
"id",
"adjustmentId",
"procurementAdjustmentId",
"recordId",
"recId",
"_info.id",
"_info.href",
];
for (const path of keyPaths) {
const key = firstValue(adjustment, [path]);
const keyText = asText(key);
if (keyText !== "-") return keyText;
}
return `anon:${stableStringify(adjustment)}`;
};
const trackedRow = (detail: JsonObject): TrackedProduct | null => {
const cwCatalogId = asNumber(
firstValue(detail, [
"catalogItem.id",
"catalogItemId",
"catalog.id",
"catalogId",
"item.id",
"itemId",
"product.id",
"productId",
"id",
]),
);
if (!cwCatalogId) return null;
const onHand = asText(
firstValue(detail, [
"onHand",
"onHandQty",
"onHandQuantity",
"qtyOnHand",
"quantityOnHand",
"quantity.onHand",
]),
);
const inventory = asText(
firstValue(detail, [
"inventory",
"inventoryQty",
"inventoryLevel",
"quantity",
"qty",
]),
);
if (onHand === "-" && inventory === "-") return null;
const product = asText(
firstValue(detail, [
"product.name",
"product.identifier",
"item.name",
"item.identifier",
"catalogItem.name",
"catalogItem.identifier",
"productName",
"productIdentifier",
"sku",
"identifier",
]),
);
return {
cwCatalogId,
product,
onHand,
inventory,
key: `${cwCatalogId}|${product}|${onHand}|${inventory}`,
};
};
const trackedRows = (adjustment: JsonObject): TrackedProduct[] => {
const detailCandidates = [
readPathValue(adjustment, "adjustmentDetails"),
readPathValue(adjustment, "details"),
readPathValue(adjustment, "lineItems"),
];
for (const candidate of detailCandidates) {
if (!Array.isArray(candidate)) continue;
const rows = candidate
.map((entry) => trackedRow(toObject(entry)))
.filter((entry): entry is TrackedProduct => entry !== null)
.sort((a, b) => a.key.localeCompare(b.key));
if (rows.length > 0) return rows;
}
const root = trackedRow(adjustment);
if (!root) return [];
return [root];
};
const snapshot = (rows: unknown[]): Map<string, AdjustmentSnapshot> => {
const out = new Map<string, AdjustmentSnapshot>();
for (const entry of rows) {
const adjustment = toObject(entry);
const key = adjustmentKey(adjustment);
const rowsTracked = trackedRows(adjustment);
const signature = stableStringify(rowsTracked);
out.set(key, {
key,
trackedRows: rowsTracked,
signature,
});
}
return out;
};
const changedCatalogIds = (
before: Map<number, string>,
after: Map<number, string>,
): Set<number> => {
const changed = new Set<number>();
for (const [cwCatalogId, nextSignature] of after) {
const prevSignature = before.get(cwCatalogId);
if (!prevSignature) {
changed.add(cwCatalogId);
continue;
}
if (prevSignature === nextSignature) continue;
changed.add(cwCatalogId);
}
return changed;
};
const productState = (
adjustments: Map<string, AdjustmentSnapshot>,
): Map<number, string> => {
const grouped = new Map<number, Set<string>>();
for (const snapshot of adjustments.values()) {
for (const row of snapshot.trackedRows) {
const rows = grouped.get(row.cwCatalogId) ?? new Set<string>();
rows.add(row.key);
grouped.set(row.cwCatalogId, rows);
}
}
const state = new Map<number, string>();
for (const [cwCatalogId, rows] of grouped) {
state.set(cwCatalogId, stableStringify([...rows].sort()));
}
return state;
};
const applySyncGuards = (ids: number[]): number[] => {
const now = Date.now();
const cooledIds = ids.filter((cwCatalogId) => {
const last = lastSyncedAt.get(cwCatalogId);
if (!last) return true;
return now - last >= SYNC_COOLDOWN_MS;
});
if (cooledIds.length <= MAX_SYNC_PER_CYCLE) return cooledIds;
return cooledIds.slice(0, MAX_SYNC_PER_CYCLE);
};
const fetchAdjustments = async (): Promise<unknown[]> => {
const response = await withCwRetry(
() => connectWiseApi.get(ADJUSTMENTS_ENDPOINT),
{
label: "inventory-adjustments",
maxAttempts: 3,
},
);
const payload = response.data;
if (Array.isArray(payload)) return payload;
if (isObject(payload) && Array.isArray(payload.data)) return payload.data;
return [];
};
const cacheKey = (cwCatalogId: number) =>
`${CATALOG_ITEM_CACHE_PREFIX}${cwCatalogId}`;
const cwLastUpdated = (item: CatalogItem): Date => {
const value = item._info?.lastUpdated;
if (!value) return new Date();
const parsed = new Date(value);
const invalidDate = Number.isNaN(parsed.getTime());
if (invalidDate) return new Date();
return parsed;
};
const syncCatalogItem = async (cwCatalogId: number): Promise<boolean> => {
try {
const item = await withCwRetry(
() => catalogCw.fetchByCatalogId(cwCatalogId),
{
label: `catalog-item:${cwCatalogId}`,
maxAttempts: 3,
},
);
const onHand = await withCwRetry(
() => catalogCw.fetchInventoryOnHand(cwCatalogId),
{
label: `catalog-onhand:${cwCatalogId}`,
maxAttempts: 3,
},
);
const persisted = await prisma.catalogItem.upsert({
where: { cwCatalogId },
create: {
cwCatalogId,
identifier: item.identifier,
name: item.description,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated: cwLastUpdated(item),
},
update: {
identifier: item.identifier,
name: item.description,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated: cwLastUpdated(item),
},
});
await redis.set(
cacheKey(cwCatalogId),
JSON.stringify({
cwCatalogId,
onHand,
cwItem: item,
dbItem: persisted,
syncedAt: new Date().toISOString(),
}),
"EX",
CATALOG_ITEM_CACHE_TTL_SECONDS,
);
return true;
} catch (err) {
console.error(
`[inventory-adjustments] failed to sync catalog item ${cwCatalogId}`,
err,
);
return false;
}
};
export const listenInventoryAdjustments = async (): Promise<void> => {
if (inFlight) return;
inFlight = true;
try {
const rows = await fetchAdjustments();
const current = snapshot(rows);
const currentProductState = productState(current);
if (previous.size === 0) {
previous = current;
previousProductState = currentProductState;
console.log(
`[inventory-adjustments] baseline captured (${current.size} adjustments, ${currentProductState.size} products)`,
);
return;
}
const changedIds = [
...changedCatalogIds(previousProductState, currentProductState),
].sort((a, b) => a - b);
const guardedIds = applySyncGuards(changedIds);
previous = current;
previousProductState = currentProductState;
if (guardedIds.length === 0) return;
let successCount = 0;
for (const cwCatalogId of guardedIds) {
const ok = await syncCatalogItem(cwCatalogId);
if (!ok) continue;
lastSyncedAt.set(cwCatalogId, Date.now());
successCount += 1;
}
const skippedByCooldown = changedIds.length - guardedIds.length;
console.log(
`[inventory-adjustments] inventory changed for ${changedIds.length} products, queued ${guardedIds.length}, synced ${successCount}, cooldown/cap skipped ${skippedByCooldown}`,
);
} catch (err) {
console.error("[inventory-adjustments] listener failed", err);
} finally {
inFlight = false;
}
};
@@ -2,6 +2,31 @@ import { prisma } from "../../../constants";
import { events } from "../../globalEvents"; import { events } from "../../globalEvents";
import { catalogCw } from "./catalog"; import { catalogCw } from "./catalog";
const CONCURRENCY = 6;
const BATCH_DELAY_MS = 250;
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const runSlowParallel = async (
tasks: Array<() => Promise<void>>,
): Promise<number> => {
let failureCount = 0;
for (let i = 0; i < tasks.length; i += CONCURRENCY) {
const batch = tasks.slice(i, i + CONCURRENCY);
const results = await Promise.allSettled(batch.map((task) => task()));
for (const result of results) {
if (result.status === "rejected") failureCount += 1;
}
if (i + CONCURRENCY >= tasks.length) continue;
await sleep(BATCH_DELAY_MS);
}
return failureCount;
};
export const refreshCatalog = async () => { export const refreshCatalog = async () => {
events.emit("cw:catalog:refresh:check"); events.emit("cw:catalog:refresh:check");
@@ -46,101 +71,104 @@ export const refreshCatalog = async () => {
staleCount: staleIds.length, staleCount: staleIds.length,
}); });
// 4. Fetch full catalog data, then filter to only stale items // 4. Fetch full CW item data for stale IDs using slow, bounded concurrency
const staleIdSet = new Set(staleIds); const cwItemMap = new Map<number, any>();
const allCwItems = await catalogCw.fetchAllItemsFromCw(); const itemFetchTasks: Array<() => Promise<void>> = staleIds.map(
const allStaleItems = new Map<number, any>(); (cwId) => async () => {
const item = await catalogCw.fetchByCatalogId(cwId);
cwItemMap.set(cwId, item);
},
);
const itemFetchFailures = await runSlowParallel(itemFetchTasks);
for (const [id, item] of allCwItems) { // 5. Fetch inventory onHand for stale IDs using the same slow parallel strategy
if (staleIdSet.has(id)) {
allStaleItems.set(id, item);
}
}
// 5. Batch fetch inventory onHand for stale items (50 concurrent)
const onHandMap = new Map<number, number>(); const onHandMap = new Map<number, number>();
const batchSize = 50; const inventoryTasks: Array<() => Promise<void>> = staleIds.map(
(cwId) => async () => {
try {
const onHand = await catalogCw.fetchInventoryOnHand(cwId);
onHandMap.set(cwId, onHand);
} catch {
onHandMap.set(cwId, 0);
}
},
);
const inventoryFailures = await runSlowParallel(inventoryTasks);
for (let i = 0; i < staleIds.length; i += batchSize) { // 6. Upsert stale/new items with bounded slow parallel execution
const batch = staleIds.slice(i, i + batchSize); let updatedCount = 0;
await Promise.all( const upsertTasks: Array<() => Promise<void>> = staleIds.map(
batch.map(async (cwId) => { (cwId) => async () => {
try { const item = cwItemMap.get(cwId);
const onHand = await catalogCw.fetchInventoryOnHand(cwId); if (!item) return;
onHandMap.set(cwId, onHand);
} catch { const cwLastUpdated = item._info?.lastUpdated
onHandMap.set(cwId, 0); ? new Date(item._info.lastUpdated)
} : new Date();
}), const onHand = onHandMap.get(cwId) ?? 0;
await prisma.catalogItem.upsert({
where: { cwCatalogId: cwId },
create: {
cwCatalogId: cwId,
identifier: item.identifier,
name: item.description,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated,
},
update: {
name: item.description,
identifier: item.identifier,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated,
},
});
updatedCount += 1;
},
);
const upsertFailures = await runSlowParallel(upsertTasks);
const failedTasks = itemFetchFailures + inventoryFailures + upsertFailures;
if (failedTasks > 0) {
console.warn(
`[catalog-refresh] ${failedTasks} slow-parallel task(s) failed; remaining items will retry next cycle`,
); );
} }
// 6. Upsert only the stale/new items
const updatedCount = (
await Promise.all(
staleIds.map(async (cwId) => {
const item = allStaleItems.get(cwId);
if (!item) return null;
const cwLastUpdated = item._info?.lastUpdated
? new Date(item._info.lastUpdated)
: new Date();
const onHand = onHandMap.get(cwId) ?? 0;
return await prisma.catalogItem.upsert({
where: { cwCatalogId: cwId },
create: {
cwCatalogId: cwId,
identifier: item.identifier,
name: item.description,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated,
},
update: {
name: item.description,
identifier: item.identifier,
description: item.description,
customerDescription: item.customerDescription,
internalNotes: item.notes,
category: item.category?.name,
categoryCwId: item.category?.id,
subcategory: item.subcategory?.name,
subcategoryCwId: item.subcategory?.id,
manufacturer: item.manufacturer?.name,
manufactureCwId: item.manufacturer?.id,
partNumber: item.manufacturerPartNumber,
vendorName: item.vendor?.name,
vendorSku: item.vendorSku,
vendorCwId: item.vendor?.id,
price: item.price,
cost: item.cost,
inactive: item.inactiveFlag,
salesTaxable: item.taxableFlag,
onHand,
cwLastUpdated,
},
});
}),
)
).filter(Boolean).length;
events.emit("cw:catalog:refresh:completed", { events.emit("cw:catalog:refresh:completed", {
totalCw: cwSummaries.size, totalCw: cwSummaries.size,
totalDb: dbItems.length, totalDb: dbItems.length,
@@ -2,6 +2,11 @@ import { prisma } from "../../../constants";
import { events } from "../../globalEvents"; import { events } from "../../globalEvents";
import { catalogCw } from "./catalog"; import { catalogCw } from "./catalog";
const CONCURRENCY = 6;
const BATCH_DELAY_MS = 250;
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
export const refreshInventory = async () => { export const refreshInventory = async () => {
events.emit("cw:inventory:refresh:check"); events.emit("cw:inventory:refresh:check");
@@ -23,13 +28,13 @@ export const refreshInventory = async () => {
totalItems: dbItems.length, totalItems: dbItems.length,
}); });
// 2. Batch fetch inventory onHand for all items (50 concurrent) // 2. Slow-parallel fetch inventory onHand for all items
const onHandMap = new Map<number, number>(); const onHandMap = new Map<number, number>();
const batchSize = 150; let failedCount = 0;
for (let i = 0; i < dbItems.length; i += batchSize) { for (let i = 0; i < dbItems.length; i += CONCURRENCY) {
const batch = dbItems.slice(i, i + batchSize); const batch = dbItems.slice(i, i + CONCURRENCY);
await Promise.all( const results = await Promise.allSettled(
batch.map(async (item) => { batch.map(async (item) => {
try { try {
const onHand = await catalogCw.fetchInventoryOnHand(item.cwCatalogId); const onHand = await catalogCw.fetchInventoryOnHand(item.cwCatalogId);
@@ -39,6 +44,13 @@ export const refreshInventory = async () => {
} }
}), }),
); );
for (const result of results) {
if (result.status === "rejected") failedCount += 1;
}
if (i + CONCURRENCY >= dbItems.length) continue;
await sleep(BATCH_DELAY_MS);
} }
// 3. Only update items where onHand has changed // 3. Only update items where onHand has changed
@@ -71,4 +83,10 @@ export const refreshInventory = async () => {
totalItems: dbItems.length, totalItems: dbItems.length,
updatedCount, updatedCount,
}); });
if (failedCount > 0) {
console.warn(
`[inventory-refresh] ${failedCount} task(s) failed; fallback values were used and will retry next sweep`,
);
}
}; };
+7
View File
@@ -383,6 +383,13 @@ export const PERMISSION_NODES = {
], ],
}, },
cwCallbacks: {
name: "ConnectWise Callback Routes",
description:
"Inbound ConnectWise callback endpoints. These routes are intentionally unauthenticated and do not require permission nodes.",
permissions: [],
},
sales: { sales: {
name: "Sales Permissions", name: "Sales Permissions",
description: "Permissions for accessing and managing sales opportunities", description: "Permissions for accessing and managing sales opportunities",
+600
View File
@@ -0,0 +1,600 @@
/**
* Test Script: CW Forecast Item Edit & Partial Cancellation
*
* This script performs read-write operations against the ConnectWise API:
*
* 1. Search all open opportunities for a forecast item with description
* matching "labor Special Order" (case-insensitive).
* 2. Report the current state of that item (price, cost, qty, etc.).
* 3. PATCH the item: revenue 72,000 | cost 8,500 | quantity 67
* 4. Verify the update by re-fetching the forecast.
* 5. Cancel 13 units via the linked procurement product
* (partial cancellation: quantityCancelled = 13).
* 6. Verify the cancellation by re-fetching procurement data.
* 7. Report on every step.
*
* Usage: bun run test-cw-edit-item.ts
*/
import axios from "axios";
const cw = axios.create({
baseURL: "https://ttscw.totaltech.net/v4_6_release/apis/3.0/",
headers: {
Authorization: `Basic ${process.env.CW_BASIC_TOKEN}`,
clientId: `${process.env.CW_CLIENT_ID}`,
"Content-Type": "application/json",
},
timeout: 30_000,
});
// ── Helpers ───────────────────────────────────────────────────────────────────
const log = (label: string, ...args: unknown[]) =>
console.log(`\n[${label}]`, ...args);
const divider = () => console.log("─".repeat(72));
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
const fmt = (n: number) =>
n.toLocaleString("en-US", {
minimumFractionDigits: 2,
maximumFractionDigits: 2,
});
// ── Types (minimal, for this script) ──────────────────────────────────────────
interface ForecastItem {
id: number;
forecastDescription: string;
productDescription: string;
quantity: number;
revenue: number;
cost: number;
margin: number;
forecastType: string;
sequenceNumber: number;
catalogItem?: { id: number; identifier: string };
status?: { id: number; name: string };
opportunity?: { id: number; name: string };
[key: string]: unknown;
}
interface Forecast {
id: number;
forecastItems: ForecastItem[];
[key: string]: unknown;
}
interface ProcurementProduct {
id: number;
forecastDetailId: number;
description: string;
quantity: number;
price: number;
cost: number;
cancelledFlag: boolean;
quantityCancelled: number;
cancelledReason: string | null;
cancelledBy: string | null;
cancelledDate: string | null;
opportunity?: { id: number };
[key: string]: unknown;
}
// ── Main ──────────────────────────────────────────────────────────────────────
async function main() {
divider();
log("START", "CW Forecast Item Edit & Cancellation Test");
log("START", `Timestamp: ${new Date().toISOString()}`);
divider();
// ── Step 1: Find the "labor Special Order" forecast item ────────────────
const OPP_ID = 5150;
log(
"SEARCH",
`Looking for forecast item matching "labor Special Order" on opportunity ${OPP_ID}...`,
);
// Fetch the forecast for opportunity 5150 directly
let targetOppId: number = OPP_ID;
let targetItem: ForecastItem | null = null;
let targetForecast: Forecast | null = null;
const forecastRes = await cw.get(`/sales/opportunities/${OPP_ID}/forecast`);
targetForecast = forecastRes.data as Forecast;
const match = (targetForecast.forecastItems ?? []).find(
(fi: ForecastItem) =>
fi.forecastDescription?.toLowerCase().includes("special order") ||
fi.productDescription?.toLowerCase().includes("special order"),
);
if (match) {
targetItem = match;
log("SEARCH", `✓ FOUND forecast item on opportunity ${OPP_ID}`);
}
if (!targetItem || !targetForecast) {
log(
"SEARCH",
`✗ No "labor Special Order" item found on opportunity ${OPP_ID}.`,
);
log("SEARCH", "All forecast items on this opportunity:");
for (const fi of targetForecast.forecastItems ?? []) {
console.log(
` id=${fi.id} "${fi.forecastDescription}" / "${fi.productDescription}"`,
);
}
log("SEARCH", "Aborting.");
process.exit(1);
}
// ── Step 2: Report current state ────────────────────────────────────────
divider();
log("CURRENT STATE", "Forecast item details BEFORE edit:");
console.log(` Opportunity ID: ${targetOppId}`);
console.log(` Forecast Item ID: ${targetItem.id}`);
console.log(` Forecast Description: ${targetItem.forecastDescription}`);
console.log(` Product Description: ${targetItem.productDescription}`);
console.log(
` Catalog Item: ${targetItem.catalogItem?.identifier ?? "(none)"} (cwId=${targetItem.catalogItem?.id ?? "N/A"})`,
);
console.log(` Forecast Type: ${targetItem.forecastType}`);
console.log(
` Status: ${targetItem.status?.name ?? "?"} (id=${targetItem.status?.id ?? "?"})`,
);
console.log(` Sequence Number: ${targetItem.sequenceNumber}`);
console.log(` ──────────────────────────────────`);
console.log(` Quantity: ${targetItem.quantity}`);
console.log(` Revenue (Price): $${fmt(targetItem.revenue)}`);
console.log(` Cost: $${fmt(targetItem.cost)}`);
console.log(` Margin: $${fmt(targetItem.margin)}`);
// Also report all items on this opportunity for context
const allItems = targetForecast.forecastItems ?? [];
log(
"CONTEXT",
`Total forecast items on this opportunity: ${allItems.length}`,
);
for (const fi of allItems) {
const marker = fi.id === targetItem.id ? " ◀ TARGET" : "";
console.log(
` [${fi.sequenceNumber}] id=${fi.id} "${fi.forecastDescription}" ` +
`qty=${fi.quantity} rev=$${fmt(fi.revenue)} cost=$${fmt(fi.cost)}${marker}`,
);
}
// ── Step 3: PATCH the forecast item ─────────────────────────────────────
divider();
const UNIT_PRICE = 72_000;
const UNIT_COST = 8_500;
const QTY = 67;
const TOTAL_REVENUE = UNIT_PRICE * QTY; // $4,824,000
const TOTAL_COST = UNIT_COST * QTY; // $569,500
log("EDIT", "Patching forecast item...");
log(
"EDIT",
` Unit price: $${fmt(UNIT_PRICE)} × ${QTY} = $${fmt(TOTAL_REVENUE)} (revenue)`,
);
log(
"EDIT",
` Unit cost: $${fmt(UNIT_COST)} × ${QTY} = $${fmt(TOTAL_COST)} (cost)`,
);
log("EDIT", ` Quantity: ${QTY}`);
// Find the index of our target item in the forecast array
const forecastItems = targetForecast.forecastItems ?? [];
const targetIdx = forecastItems.findIndex((fi) => fi.id === targetItem!.id);
if (targetIdx === -1) {
log(
"EDIT",
"✗ Could not find target item index in forecast array. Aborting.",
);
process.exit(1);
}
log("EDIT", `Target item is at index ${targetIdx} in forecastItems array.`);
const patchOps = [
{
op: "replace",
path: `/forecastItems/${targetIdx}/revenue`,
value: TOTAL_REVENUE,
},
{
op: "replace",
path: `/forecastItems/${targetIdx}/cost`,
value: TOTAL_COST,
},
{ op: "replace", path: `/forecastItems/${targetIdx}/quantity`, value: QTY },
];
log("EDIT", "Patch operations:");
for (const op of patchOps) {
console.log(` ${op.op} ${op.path}${op.value}`);
}
try {
const patchRes = await cw.patch(
`/sales/opportunities/${targetOppId}/forecast`,
patchOps,
);
const updatedForecast: Forecast = patchRes.data;
const updatedItem = (updatedForecast.forecastItems ?? [])[targetIdx];
if (!updatedItem) {
log("EDIT", "✗ Item not found at expected index after PATCH.");
} else {
log("EDIT", "✓ PATCH successful. Updated item:");
console.log(` Forecast Item ID: ${updatedItem.id}`);
console.log(` Forecast Description: ${updatedItem.forecastDescription}`);
console.log(` Quantity: ${updatedItem.quantity}`);
console.log(` Revenue (Price): $${fmt(updatedItem.revenue)}`);
console.log(` Cost: $${fmt(updatedItem.cost)}`);
console.log(` Margin: $${fmt(updatedItem.margin)}`);
// Verify values match what we set
const checks = [
{
field: "revenue",
expected: TOTAL_REVENUE,
actual: updatedItem.revenue,
},
{ field: "cost", expected: TOTAL_COST, actual: updatedItem.cost },
{ field: "quantity", expected: QTY, actual: updatedItem.quantity },
];
log("VERIFY EDIT", "Checking values match requested:");
for (const check of checks) {
const ok = check.actual === check.expected;
console.log(
` ${ok ? "✓" : "✗"} ${check.field}: expected=${check.expected}, actual=${check.actual}`,
);
}
// Update our reference for the cancellation step
targetItem = updatedItem;
}
} catch (err: any) {
log("EDIT", `✗ PATCH failed: ${err.response?.status ?? err.message}`);
if (err.response?.data) {
console.log(" Response:", JSON.stringify(err.response.data, null, 2));
}
// If quantity PATCH failed (read-only), try without quantity
if (err.response?.status === 400 || err.response?.status === 422) {
log(
"EDIT",
"Retrying without quantity (may be read-only on forecast items)...",
);
const retryOps = patchOps.filter((op) => !op.path.endsWith("/quantity"));
try {
const retryRes = await cw.patch(
`/sales/opportunities/${targetOppId}/forecast`,
retryOps,
);
const retryForecast: Forecast = retryRes.data;
const retryItem = (retryForecast.forecastItems ?? [])[targetIdx];
if (retryItem) {
log(
"EDIT",
"✓ Retry PATCH successful (without quantity). Updated item:",
);
console.log(
` Quantity: ${retryItem.quantity} (unchanged — read-only)`,
);
console.log(` Revenue (Price): $${fmt(retryItem.revenue)}`);
console.log(` Cost: $${fmt(retryItem.cost)}`);
console.log(` Margin: $${fmt(retryItem.margin)}`);
targetItem = retryItem;
}
} catch (retryErr: any) {
log(
"EDIT",
`✗ Retry also failed: ${retryErr.response?.status ?? retryErr.message}`,
);
if (retryErr.response?.data) {
console.log(
" Response:",
JSON.stringify(retryErr.response.data, null, 2),
);
}
}
}
}
// ── Step 4: Re-fetch and confirm final forecast state ───────────────────
divider();
log("RE-FETCH", "Fetching forecast to confirm final state...");
await sleep(500);
const confirmRes = await cw.get(
`/sales/opportunities/${targetOppId}/forecast`,
);
const confirmedForecast: Forecast = confirmRes.data;
const confirmedItem = (confirmedForecast.forecastItems ?? []).find(
(fi) => fi.id === targetItem!.id,
);
if (confirmedItem) {
log("CONFIRMED STATE", "Forecast item after edit:");
console.log(` Forecast Item ID: ${confirmedItem.id}`);
console.log(` Forecast Description: ${confirmedItem.forecastDescription}`);
console.log(` Quantity: ${confirmedItem.quantity}`);
console.log(` Revenue (Price): $${fmt(confirmedItem.revenue)}`);
console.log(` Cost: $${fmt(confirmedItem.cost)}`);
console.log(` Margin: $${fmt(confirmedItem.margin)}`);
} else {
log(
"CONFIRMED STATE",
"⚠ Could not find item by original ID — it may have been regenerated.",
);
log("CONFIRMED STATE", "All current forecast items:");
for (const fi of confirmedForecast.forecastItems ?? []) {
console.log(
` id=${fi.id} "${fi.forecastDescription}" qty=${fi.quantity} rev=$${fmt(fi.revenue)} cost=$${fmt(fi.cost)}`,
);
}
}
// ── Step 5: Cancel 13 items via procurement product ─────────────────────
divider();
log("CANCEL", "Cancelling 13 units on this item via procurement product...");
// First, find existing procurement products linked to this opportunity
const procRes = await cw.get(
`/procurement/products?conditions=${encodeURIComponent(`opportunity/id=${targetOppId}`)}&pageSize=1000`,
);
const procProducts: ProcurementProduct[] = procRes.data;
log(
"CANCEL",
`Found ${procProducts.length} procurement product(s) on this opportunity.`,
);
if (procProducts.length > 0) {
for (const pp of procProducts) {
console.log(
` Proc id=${pp.id} forecastDetailId=${pp.forecastDetailId} ` +
`"${pp.description}" qty=${pp.quantity} price=$${fmt(pp.price ?? 0)} ` +
`cancelled=${pp.cancelledFlag} qtyCancelled=${pp.quantityCancelled}`,
);
}
}
// Find the procurement product linked to our forecast item
const linkedProc = procProducts.find(
(pp) => pp.forecastDetailId === targetItem!.id,
);
if (linkedProc) {
log("CANCEL", `Found linked procurement product: id=${linkedProc.id}`);
log(
"CANCEL",
`Current state: cancelled=${linkedProc.cancelledFlag}, quantityCancelled=${linkedProc.quantityCancelled}`,
);
log("CANCEL", "Patching: quantityCancelled → 13, cancelledFlag → true");
try {
const cancelRes = await cw.patch(
`/procurement/products/${linkedProc.id}`,
[
{ op: "replace", path: "cancelledFlag", value: true },
{ op: "replace", path: "quantityCancelled", value: 13 },
{
op: "replace",
path: "cancelledReason",
value: "Test cancellation — 13 units",
},
],
);
log("CANCEL", "✓ Cancellation PATCH successful.");
console.log(` cancelledFlag: ${cancelRes.data.cancelledFlag}`);
console.log(` quantityCancelled: ${cancelRes.data.quantityCancelled}`);
console.log(` cancelledReason: ${cancelRes.data.cancelledReason}`);
console.log(
` cancelledBy: ${cancelRes.data.cancelledBy ?? "N/A"}`,
);
console.log(
` cancelledDate: ${cancelRes.data.cancelledDate ?? "N/A"}`,
);
} catch (err: any) {
log(
"CANCEL",
`✗ Cancellation PATCH failed: ${err.response?.status ?? err.message}`,
);
if (err.response?.data) {
console.log(" Response:", JSON.stringify(err.response.data, null, 2));
}
}
} else {
log(
"CANCEL",
`No procurement product linked to forecast item id=${targetItem!.id}.`,
);
log(
"CANCEL",
"Creating a procurement product first, then cancelling 13...",
);
try {
// Create a procurement product linked to this forecast item
const createProcRes = await cw.post("/procurement/products", {
catalogItem: targetItem!.catalogItem?.id
? { id: targetItem!.catalogItem.id }
: undefined,
description:
targetItem!.forecastDescription || targetItem!.productDescription,
quantity: targetItem!.quantity || 67,
price: targetItem!.revenue || 72_000,
cost: targetItem!.cost || 8_500,
billableOption: "Billable",
opportunity: { id: targetOppId },
forecastDetailId: targetItem!.id,
});
const newProc = createProcRes.data;
log("CANCEL", `✓ Created procurement product id=${newProc.id}`);
console.log(` forecastDetailId: ${newProc.forecastDetailId}`);
console.log(` description: ${newProc.description}`);
console.log(` quantity: ${newProc.quantity}`);
console.log(` price: $${fmt(newProc.price ?? 0)}`);
console.log(` cost: $${fmt(newProc.cost ?? 0)}`);
// Now cancel 13 units
log("CANCEL", "Patching procurement product: quantityCancelled → 13...");
const cancelRes = await cw.patch(`/procurement/products/${newProc.id}`, [
{ op: "replace", path: "cancelledFlag", value: true },
{ op: "replace", path: "quantityCancelled", value: 13 },
{
op: "replace",
path: "cancelledReason",
value: "Test cancellation — 13 units",
},
]);
log("CANCEL", "✓ Cancellation PATCH successful.");
console.log(` cancelledFlag: ${cancelRes.data.cancelledFlag}`);
console.log(` quantityCancelled: ${cancelRes.data.quantityCancelled}`);
console.log(` cancelledReason: ${cancelRes.data.cancelledReason}`);
console.log(
` cancelledBy: ${cancelRes.data.cancelledBy ?? "N/A"}`,
);
console.log(
` cancelledDate: ${cancelRes.data.cancelledDate ?? "N/A"}`,
);
} catch (err: any) {
log("CANCEL", `✗ Failed: ${err.response?.status ?? err.message}`);
if (err.response?.data) {
console.log(" Response:", JSON.stringify(err.response.data, null, 2));
}
}
}
// ── Step 6: Final verification ──────────────────────────────────────────
divider();
log("FINAL VERIFY", "Re-fetching all data for final report...");
await sleep(500);
// Re-fetch forecast
const finalForecastRes = await cw.get(
`/sales/opportunities/${targetOppId}/forecast`,
);
const finalForecast: Forecast = finalForecastRes.data;
const finalItem =
(finalForecast.forecastItems ?? []).find(
(fi) => fi.id === targetItem!.id,
) ??
(finalForecast.forecastItems ?? []).find(
(fi) =>
fi.forecastDescription?.toLowerCase().includes("special order") ||
fi.productDescription?.toLowerCase().includes("special order"),
);
// Re-fetch procurement
const finalProcRes = await cw.get(
`/procurement/products?conditions=${encodeURIComponent(`opportunity/id=${targetOppId}`)}&pageSize=1000`,
);
const finalProcs: ProcurementProduct[] = finalProcRes.data;
log("FINAL STATE — FORECAST ITEM", "");
if (finalItem) {
console.log(` Forecast Item ID: ${finalItem.id}`);
console.log(` Forecast Description: ${finalItem.forecastDescription}`);
console.log(` Quantity: ${finalItem.quantity}`);
console.log(` Revenue (Price): $${fmt(finalItem.revenue)}`);
console.log(` Cost: $${fmt(finalItem.cost)}`);
console.log(` Margin: $${fmt(finalItem.margin)}`);
} else {
console.log(" ⚠ Target item not found in final forecast.");
}
log("FINAL STATE — PROCUREMENT", `${finalProcs.length} product(s):`);
for (const pp of finalProcs) {
console.log(
` id=${pp.id} forecastDetailId=${pp.forecastDetailId} ` +
`"${pp.description}" qty=${pp.quantity} cancelled=${pp.cancelledFlag} ` +
`qtyCancelled=${pp.quantityCancelled} reason="${pp.cancelledReason ?? ""}"`,
);
}
// ── Summary ─────────────────────────────────────────────────────────────
divider();
log("SUMMARY", "");
// After cancelling 13 of 67, CW recalculates totals for remaining 54 units
const expectedFinalRevenue = Math.round(UNIT_PRICE * (QTY - 13) * 100) / 100;
const expectedFinalCost = Math.round(UNIT_COST * (QTY - 13) * 100) / 100;
const editOk = finalItem
? Math.abs(finalItem.revenue - expectedFinalRevenue) < 1 &&
Math.abs(finalItem.cost - expectedFinalCost) < 1
: false;
const qtyOk = finalItem ? finalItem.quantity === QTY : false;
if (finalItem) {
console.log(
` Expected final revenue ($${fmt(UNIT_PRICE)} × ${QTY - 13}): $${fmt(expectedFinalRevenue)}`,
);
console.log(
` Actual final revenue: $${fmt(finalItem.revenue)}`,
);
console.log(
` Expected final cost ($${fmt(UNIT_COST)} × ${QTY - 13}): $${fmt(expectedFinalCost)}`,
);
console.log(
` Actual final cost: $${fmt(finalItem.cost)}`,
);
}
const cancelOk = finalProcs.some(
(pp) =>
pp.forecastDetailId === targetItem!.id &&
pp.cancelledFlag === true &&
pp.quantityCancelled === 13,
);
console.log(
` Unit price $${fmt(UNIT_PRICE)}/ea: `,
editOk ? "✓ PASS" : "✗ FAIL",
);
console.log(
` Unit cost $${fmt(UNIT_COST)}/ea: `,
editOk ? "✓ PASS" : "✗ FAIL",
);
console.log(
` Quantity set to ${QTY}: `,
qtyOk ? "✓ PASS" : "✗ FAIL (may be read-only)",
);
console.log(
" 13 units cancelled: ",
cancelOk ? "✓ PASS" : "✗ FAIL",
);
const allPass = editOk && qtyOk && cancelOk;
divider();
log(
"RESULT",
allPass
? "✓ ALL CHECKS PASSED"
: "⚠ SOME CHECKS DID NOT PASS — review output above",
);
divider();
}
main().catch((err) => {
console.error("\n[FATAL]", err.response?.data ?? err.message);
process.exit(1);
});
+4 -4
View File
@@ -11,12 +11,12 @@ const DAY_MS = 24 * 60 * 60 * 1000;
describe("computeProductsCacheTTL", () => { describe("computeProductsCacheTTL", () => {
// -- Constants ---------------------------------------------------------- // -- Constants ----------------------------------------------------------
test("PRODUCTS_TTL_HOT is 15 seconds", () => { test("PRODUCTS_TTL_HOT is 45 seconds", () => {
expect(PRODUCTS_TTL_HOT).toBe(15_000); expect(PRODUCTS_TTL_HOT).toBe(45_000);
}); });
test("PRODUCTS_TTL_LAZY is 30 minutes", () => { test("PRODUCTS_TTL_LAZY is 20 minutes", () => {
expect(PRODUCTS_TTL_LAZY).toBe(1_800_000); expect(PRODUCTS_TTL_LAZY).toBe(1_200_000);
}); });
// -- Won/Lost status set ------------------------------------------------ // -- Won/Lost status set ------------------------------------------------
+552
View File
@@ -0,0 +1,552 @@
import axios from "axios";
type JsonObject = Record<string, unknown>;
type SnapshotItem = {
key: string;
serialized: string;
data: JsonObject;
trackedRows: TrackedRow[];
trackedSignature: string;
};
type TrackedRow = {
key: string;
product: string;
onHand: string;
inventory: string;
};
const POLL_MS = 60_000;
const CW_BASE_URL =
process.env.CW_BASE_URL ??
"https://ttscw.totaltech.net/v4_6_release/apis/3.0";
const ENDPOINT = "/procurement/adjustments?pageSize=1000";
const CW_BASIC_TOKEN = process.env.CW_BASIC_TOKEN;
const CW_CLIENT_ID = process.env.CW_CLIENT_ID;
if (!CW_BASIC_TOKEN || !CW_CLIENT_ID) {
console.error(
"Missing required env vars: CW_BASIC_TOKEN and/or CW_CLIENT_ID",
);
process.exit(1);
}
const cw = axios.create({
baseURL: CW_BASE_URL,
headers: {
Authorization: `Basic ${CW_BASIC_TOKEN}`,
clientId: CW_CLIENT_ID,
"Content-Type": "application/json",
},
timeout: 30_000,
});
const isObject = (value: unknown): value is JsonObject =>
typeof value === "object" && value !== null && !Array.isArray(value);
const stableStringify = (value: unknown): string => {
if (Array.isArray(value)) {
const entries = value.map((entry) => stableStringify(entry)).sort();
return `[${entries.join(",")}]`;
}
if (isObject(value)) {
const keys = Object.keys(value).sort();
const pairs = keys.map(
(key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`,
);
return `{${pairs.join(",")}}`;
}
return JSON.stringify(value);
};
const toObject = (value: unknown): JsonObject => {
if (!isObject(value)) return {};
return value;
};
const readPathValue = (obj: JsonObject, path: string): unknown => {
const parts = path.split(".");
let current: unknown = obj;
for (const part of parts) {
if (!isObject(current)) return null;
current = current[part];
}
return current;
};
const asKey = (value: unknown): string | null => {
if (typeof value === "string" && value.length > 0) return value;
if (typeof value === "number") return value.toString();
return null;
};
const firstValue = (obj: JsonObject, paths: string[]): unknown => {
for (const path of paths) {
const value = readPathValue(obj, path);
if (value === null || value === undefined || value === "") continue;
return value;
}
return null;
};
const itemKey = (adjustment: JsonObject): string => {
const keyPaths = [
"id",
"adjustmentId",
"procurementAdjustmentId",
"recordId",
"recId",
"_info.id",
"_info.href",
];
for (const keyPath of keyPaths) {
const keyValue = asKey(readPathValue(adjustment, keyPath));
if (keyValue) return keyValue;
}
return `anon:${stableStringify(adjustment)}`;
};
const summarize = (adjustment: JsonObject) => {
const pick = (...paths: string[]) => {
for (const path of paths) {
const value = readPathValue(adjustment, path);
if (value !== null && value !== undefined && value !== "") return value;
}
return "-";
};
return {
id: pick("id", "adjustmentId", "procurementAdjustmentId", "recordId"),
type: pick(
"type.name",
"type.identifier",
"type.id",
"type",
"adjustmentType.name",
"adjustmentType",
"transactionType.name",
"transactionType",
),
amount: pick("amount", "value", "total", "quantity"),
status: pick("status.name", "status", "state"),
description: pick("description", "summary", "notes"),
updatedBy: pick("_info.updatedBy", "updatedBy", "lastUpdatedBy"),
lastUpdated: pick("_info.lastUpdated", "lastUpdated", "dateUpdated"),
};
};
const normalizeValue = (value: unknown): string => {
if (value === null || value === undefined || value === "") return "-";
return toDisplayValue(value);
};
const isMeaningfulQuantity = (value: string) => value !== "-";
const toTrackedRow = (detail: JsonObject): TrackedRow | null => {
const productValue = firstValue(detail, [
"product.name",
"product.identifier",
"product.id",
"item.name",
"item.identifier",
"item.id",
"catalogItem.name",
"catalogItem.identifier",
"catalogItem.id",
"productIdentifier",
"productName",
"sku",
"identifier",
"id",
]);
const onHandValue = firstValue(detail, [
"onHand",
"onHandQty",
"onHandQuantity",
"qtyOnHand",
"quantityOnHand",
"quantity.onHand",
]);
const inventoryValue = firstValue(detail, [
"inventory",
"inventoryQty",
"inventoryLevel",
"quantity",
"qty",
]);
const onHand = normalizeValue(onHandValue);
const inventory = normalizeValue(inventoryValue);
const hasMeaningfulQuantity =
isMeaningfulQuantity(onHand) || isMeaningfulQuantity(inventory);
if (!hasMeaningfulQuantity) return null;
const product = normalizeValue(productValue);
const rowKey = `${product}|${onHand}|${inventory}`;
return {
key: rowKey,
product,
onHand,
inventory,
};
};
const getTrackedRows = (adjustment: JsonObject): TrackedRow[] => {
const detailCandidates = [
readPathValue(adjustment, "adjustmentDetails"),
readPathValue(adjustment, "details"),
readPathValue(adjustment, "lineItems"),
];
for (const candidate of detailCandidates) {
if (!Array.isArray(candidate)) continue;
const rows = candidate
.map((entry) => toTrackedRow(toObject(entry)))
.filter((entry): entry is TrackedRow => entry !== null)
.sort((a, b) => a.key.localeCompare(b.key));
if (rows.length > 0) return rows;
}
const rootRow = toTrackedRow(adjustment);
if (!rootRow) return [];
return [rootRow];
};
const toDisplayValue = (value: unknown): string => {
if (value === null || value === undefined || value === "") return "-";
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
return String(value);
}
if (Array.isArray(value)) {
return `[${value.map((entry) => toDisplayValue(entry)).join(",")}]`;
}
if (!isObject(value)) return String(value);
const commonObjectPaths = ["name", "identifier", "id", "value", "code"];
for (const path of commonObjectPaths) {
const objectValue = readPathValue(value, path);
if (objectValue === null || objectValue === undefined || objectValue === "")
continue;
if (typeof objectValue === "object") continue;
return String(objectValue);
}
return stableStringify(value);
};
const clean = (value: unknown): string => {
if (value === null || value === undefined || value === "") return "-";
return toDisplayValue(value);
};
const diffPaths = (
before: unknown,
after: unknown,
currentPath = "",
paths: string[] = [],
maxPaths = 6,
): string[] => {
if (paths.length >= maxPaths) return paths;
const beforeIsObject = isObject(before);
const afterIsObject = isObject(after);
if (!beforeIsObject || !afterIsObject) {
if (stableStringify(before) === stableStringify(after)) return paths;
const label = currentPath || "(root)";
paths.push(label);
return paths;
}
const keys = [
...new Set([...Object.keys(before), ...Object.keys(after)]),
].sort();
for (const key of keys) {
if (paths.length >= maxPaths) return paths;
const nextPath = currentPath ? `${currentPath}.${key}` : key;
diffPaths(before[key], after[key], nextPath, paths, maxPaths);
}
return paths;
};
const toLine = (kind: "+" | "~" | "-", adjustment: JsonObject): string => {
const s = summarize(adjustment);
return `${kind} id=${clean(s.id)} type=${clean(s.type)} status=${clean(s.status)} amount=${clean(
s.amount,
)} by=${clean(s.updatedBy)} desc=${clean(s.description)}`;
};
const updatedToLine = (before: JsonObject, after: JsonObject): string => {
const prev = summarize(before);
const next = summarize(after);
const changed: string[] = [];
if (clean(prev.status) !== clean(next.status)) {
changed.push(`status:${clean(prev.status)}${clean(next.status)}`);
}
if (clean(prev.amount) !== clean(next.amount)) {
changed.push(`amount:${clean(prev.amount)}${clean(next.amount)}`);
}
if (clean(prev.updatedBy) !== clean(next.updatedBy)) {
changed.push(`by:${clean(prev.updatedBy)}${clean(next.updatedBy)}`);
}
if (clean(prev.description) !== clean(next.description)) {
changed.push(`desc:${clean(prev.description)}${clean(next.description)}`);
}
if (clean(prev.lastUpdated) !== clean(next.lastUpdated)) {
changed.push(
`updated:${clean(prev.lastUpdated)}${clean(next.lastUpdated)}`,
);
}
const noisyFields = new Set(["_info.lastUpdated"]);
const rawDiffs = diffPaths(before, after).filter(
(path) => !noisyFields.has(path),
);
const rawDelta =
rawDiffs.length > 0 ? `fields:${rawDiffs.join(",")}` : "content changed";
const delta = changed.length > 0 ? changed.join(" | ") : rawDelta;
return `~ id=${clean(next.id)} type=${clean(next.type)} ${delta}`;
};
const formatTracked = (row: TrackedRow) =>
`product=${row.product} onHand=${row.onHand} inventory=${row.inventory}`;
const trackedAddedLine = (item: SnapshotItem) => {
const base = summarize(item.data);
const rows = item.trackedRows.slice(0, 3).map(formatTracked).join(" ; ");
const more =
item.trackedRows.length > 3
? ` ; +${item.trackedRows.length - 3} more`
: "";
return `+ id=${clean(base.id)} type=${clean(base.type)} ${rows}${more}`;
};
const trackedRemovedLine = (item: SnapshotItem) => {
const base = summarize(item.data);
const rows = item.trackedRows.slice(0, 3).map(formatTracked).join(" ; ");
const more =
item.trackedRows.length > 3
? ` ; +${item.trackedRows.length - 3} more`
: "";
return `- id=${clean(base.id)} type=${clean(base.type)} ${rows}${more}`;
};
const trackedUpdatedLine = (
beforeItem: SnapshotItem,
afterItem: SnapshotItem,
) => {
const base = summarize(afterItem.data);
const beforeMap = new Map(
beforeItem.trackedRows.map((row) => [row.product, row]),
);
const afterMap = new Map(
afterItem.trackedRows.map((row) => [row.product, row]),
);
const productKeys = [
...new Set([...beforeMap.keys(), ...afterMap.keys()]),
].sort();
const deltas: string[] = [];
for (const product of productKeys) {
const prev = beforeMap.get(product);
const next = afterMap.get(product);
if (!prev && next) {
deltas.push(
`${product} added(onHand=${next.onHand},inventory=${next.inventory})`,
);
continue;
}
if (prev && !next) {
deltas.push(`${product} removed`);
continue;
}
if (!prev || !next) continue;
const onHandChanged = prev.onHand !== next.onHand;
const inventoryChanged = prev.inventory !== next.inventory;
if (!onHandChanged && !inventoryChanged) continue;
const parts: string[] = [];
onHandChanged ? parts.push(`onHand:${prev.onHand}${next.onHand}`) : null;
inventoryChanged
? parts.push(`inventory:${prev.inventory}${next.inventory}`)
: null;
deltas.push(`${product} ${parts.join(",")}`);
}
const shown = deltas.slice(0, 3).join(" ; ");
const more = deltas.length > 3 ? ` ; +${deltas.length - 3} more` : "";
const changes = shown || "inventory/on-hand changed";
return `~ id=${clean(base.id)} type=${clean(base.type)} ${changes}${more}`;
};
const toSnapshot = (rows: unknown[]): SnapshotItem[] =>
rows.map((row) => {
const data = toObject(row);
const trackedRows = getTrackedRows(data);
const trackedSignature = stableStringify(trackedRows);
return {
key: itemKey(data),
serialized: stableStringify(data),
data,
trackedRows,
trackedSignature,
};
});
const fetchAdjustments = async (): Promise<unknown[]> => {
const response = await cw.get(ENDPOINT);
const payload = response.data;
if (Array.isArray(payload)) return payload;
if (isObject(payload) && Array.isArray(payload.data)) return payload.data;
return [];
};
const now = () => new Date().toISOString();
let previous = new Map<string, SnapshotItem>();
const run = async () => {
console.log(
`[${now()}] Watching ${CW_BASE_URL}${ENDPOINT} every ${POLL_MS / 1000}s`,
);
while (true) {
try {
const rows = await fetchAdjustments();
const snapshotItems = toSnapshot(rows);
const current = new Map(snapshotItems.map((item) => [item.key, item]));
if (previous.size === 0) {
previous = current;
console.log(
`[${now()}] Baseline captured (${current.size} adjustments)`,
);
await Bun.sleep(POLL_MS);
continue;
}
const added: SnapshotItem[] = [];
const removed: SnapshotItem[] = [];
const updated: Array<{ before: SnapshotItem; after: SnapshotItem }> = [];
for (const [key, item] of current) {
const previousItem = previous.get(key);
if (!previousItem) {
if (item.trackedRows.length === 0) continue;
added.push(item);
continue;
}
const hasTrackedRows =
item.trackedRows.length > 0 || previousItem.trackedRows.length > 0;
if (!hasTrackedRows) continue;
if (previousItem.trackedSignature !== item.trackedSignature) {
updated.push({ before: previousItem, after: item });
}
}
for (const [key, item] of previous) {
if (!current.has(key) && item.trackedRows.length > 0)
removed.push(item);
}
if (added.length > 0 || updated.length > 0 || removed.length > 0) {
console.log(`\n[${now()}] Changes detected:`);
console.log(`- added: ${added.length}`);
console.log(`- updated: ${updated.length}`);
console.log(`- removed: ${removed.length}`);
if (added.length > 0) {
console.log("\nAdded:");
for (const item of added) {
console.log(trackedAddedLine(item));
}
}
if (updated.length > 0) {
console.log("\nUpdated:");
for (const item of updated) {
console.log(trackedUpdatedLine(item.before, item.after));
}
}
if (removed.length > 0) {
console.log("\nRemoved:");
for (const item of removed) {
console.log(trackedRemovedLine(item));
}
}
}
if (added.length === 0 && updated.length === 0 && removed.length === 0) {
console.log(`[${now()}] No changes (${current.size} adjustments)`);
}
previous = current;
} catch (error) {
if (axios.isAxiosError(error)) {
console.error(
`[${now()}] Poll failed: ${error.response?.status ?? "ERR"}`,
error.message,
);
} else {
console.error(`[${now()}] Poll failed:`, error);
}
}
await Bun.sleep(POLL_MS);
}
};
run().catch((error) => {
console.error("Watcher crashed:", error);
process.exit(1);
});
+217
View File
@@ -0,0 +1,217 @@
import { appendFile, mkdir } from "node:fs/promises";
const port = 3001;
const logDir = "cw-api-logs";
const logFilePath = `${logDir}/test-webserver-${new Date().toISOString().replace(/[:.]/g, "-")}.jsonl`;
const jsonBodyMethods = ["POST", "PUT", "PATCH", "DELETE"];
type ParsedJson = Record<string, unknown> | unknown[];
type EventSummary = ReturnType<typeof buildSummary>;
const color = {
reset: "\x1b[0m",
bold: "\x1b[1m",
dim: "\x1b[2m",
cyan: "\x1b[36m",
blue: "\x1b[34m",
yellow: "\x1b[33m",
magenta: "\x1b[35m",
green: "\x1b[32m",
gray: "\x1b[90m",
};
const paint = (value: string, tone: keyof typeof color) =>
`${color[tone]}${value}${color.reset}`;
const safeParseJson = (value: string): ParsedJson | null => {
try {
const parsed = JSON.parse(value);
const isObject = typeof parsed === "object" && parsed !== null;
return isObject ? (parsed as ParsedJson) : null;
} catch {
return null;
}
};
const parseEntity = (value: unknown): ParsedJson | null => {
if (typeof value === "string") return safeParseJson(value);
if (typeof value !== "object" || value === null) return null;
return value as ParsedJson;
};
const asObject = (value: ParsedJson | null): Record<string, unknown> | null => {
if (!value) return null;
if (Array.isArray(value)) return null;
return value;
};
const parseJsonStringFields = (
value: Record<string, unknown> | null,
): Record<string, unknown> | null => {
if (!value) return null;
return Object.entries(value).reduce<Record<string, unknown>>(
(acc, [key, current]) => {
if (typeof current !== "string") {
acc[key] = current;
return acc;
}
const looksLikeJson = current.startsWith("{") || current.startsWith("[");
if (!looksLikeJson) {
acc[key] = current;
return acc;
}
const parsed = safeParseJson(current);
acc[key] = parsed ?? current;
return acc;
},
{},
);
};
const parseQuery = (url: URL) => {
const entries = [...url.searchParams.entries()];
const params = entries.reduce<Record<string, string>>((acc, [key, value]) => {
acc[key] = value;
return acc;
}, {});
const rawQuery = url.search.startsWith("?")
? url.search.slice(1)
: url.search;
const firstSegment = rawQuery.split("&")[0] ?? "";
const hasEquals = firstSegment.includes("=");
const inferredId = !hasEquals && firstSegment ? firstSegment : null;
return {
params,
inferredId,
};
};
const buildSummary = (
parsedBody: Record<string, unknown> | null,
parsedEntity: Record<string, unknown> | null,
) => {
if (!parsedBody) return null;
return {
messageId: parsedBody.MessageId ?? null,
action: parsedBody.Action ?? null,
type: parsedBody.Type ?? null,
id: parsedBody.ID ?? null,
memberId: parsedBody.MemberId ?? null,
entityStatus:
parsedEntity?.StatusName ??
parsedEntity?.TicketStatus ??
parsedEntity?.Status ??
null,
entitySummary: parsedEntity?.Summary ?? parsedEntity?.CompanyName ?? null,
entityUpdatedBy: parsedEntity?.UpdatedBy ?? null,
entityLastUpdated:
parsedEntity?.LastUpdatedUTC ?? parsedEntity?.LastUpdated ?? null,
};
};
const displayTerminalEvent = (
method: string,
routePath: string,
query: { params: Record<string, string>; inferredId: string | null },
summary: EventSummary,
timestamp: string,
) => {
const id = String(summary?.id ?? query.inferredId ?? "-");
const action = String(summary?.action ?? query.params.action ?? "-");
const eventType = String(summary?.type ?? routePath.split("/")[1] ?? "-");
const actor = String(
summary?.entityUpdatedBy ??
query.params.memberId ??
summary?.memberId ??
"-",
);
const status = String(summary?.entityStatus ?? "-");
const title = String(summary?.entitySummary ?? "-");
const methodTone = method === "GET" ? "green" : "yellow";
console.log();
console.log(
`${paint("●", "cyan")} ${paint(method, methodTone)} ${paint(routePath, "blue")} ${paint(timestamp, "gray")}`,
);
console.log(
`${paint("type", "magenta")}: ${paint(eventType, "bold")} ${paint("action", "magenta")}: ${action} ${paint("id", "magenta")}: ${id}`,
);
console.log(
`${paint("actor", "magenta")}: ${paint(actor, "cyan")} ${paint("status", "magenta")}: ${status}`,
);
console.log(`${paint("title", "magenta")}: ${title}`);
};
const writeLogRecord = async (record: Record<string, unknown>) => {
await appendFile(logFilePath, `${JSON.stringify(record)}\n`, "utf8");
};
await mkdir(logDir, { recursive: true });
Bun.serve({
port,
async fetch(request) {
const url = new URL(request.url);
const routePath = `${url.pathname}${url.search}`;
const method = request.method;
const query = parseQuery(url);
const startedAt = new Date().toISOString();
const rawBody = jsonBodyMethods.includes(method)
? await request.text()
: "";
const parsedJson = safeParseJson(rawBody);
const parsedBody = asObject(parsedJson);
const parsedBodyExpanded = parseJsonStringFields(parsedBody);
const parsedEntity = asObject(parseEntity(parsedBodyExpanded?.Entity));
const summary = buildSummary(parsedBodyExpanded, parsedEntity);
const responseBody = {
success: true,
method,
path: routePath,
timestamp: startedAt,
summary,
};
const responseStatus = 200;
displayTerminalEvent(method, routePath, query, summary, startedAt);
await writeLogRecord({
timestamp: startedAt,
request: {
method,
path: routePath,
query,
headers: Object.fromEntries(request.headers.entries()),
bodyRaw: rawBody || null,
bodyParsed: parsedBodyExpanded,
entityParsed: parsedEntity,
summary,
},
response: {
status: responseStatus,
body: responseBody,
},
});
return Response.json(responseBody, { status: responseStatus });
},
});
console.log(`Test webserver listening on http://localhost:${port}`);
console.log(`Response/request log file: ${logFilePath}`);