diff --git a/API_ROUTES.md b/API_ROUTES.md index b74156a..33565bf 100644 --- a/API_ROUTES.md +++ b/API_ROUTES.md @@ -32,6 +32,56 @@ See [PERMISSIONS.md](PERMISSIONS.md) for the full list of field-level permission ## Authentication Routes +## ConnectWise Callback Routes + +### Receive ConnectWise Callback + +**POST** `/cw/callback/:secret/:resource` + +Receives ConnectWise callback/webhook payloads for supported resources and returns a normalized success response. + +**Authentication Required:** No + +**Path Parameters:** + +- `secret` — Shared callback secret, validated against `CW_CALLBACK_SECRET` when configured +- `resource` — one of: `opportunity`, `ticket`, `company`, `activity` + +**Behavior:** + +- Parses JSON request body when present. +- Decodes JSON-encoded payload fields such as `Entity`. +- Logs a concise callback summary to console. + +**Response:** + +```json +{ + "status": 200, + "message": "CW callback received.", + "data": { + "resource": "ticket", + "summary": { + "resource": "ticket", + "messageId": "1bec7421-204a-4b30-8b06-465915e9a0f5", + "action": "updated", + "type": "ticket", + "id": 36073, + "memberId": "jroberts", + "entityStatus": "In Progress", + "entitySummary": "Onsite Installation: Rough-In", + "entityUpdatedBy": "cirvine", + "entityLastUpdated": "2026-03-03T21:43:29.903" + }, + "bodyParsed": {}, + "receivedAt": "2026-03-04T00:00:00.000Z" + }, + "successful": true +} +``` + +--- + ### Get Authentication URI **GET** `/auth/uri` diff --git a/CACHING.md b/CACHING.md index 292be43..141b114 100644 --- a/CACHING.md +++ b/CACHING.md @@ -11,7 +11,7 @@ The API caches expensive ConnectWise (CW) API responses in **Redis** to reduce l ### Key design principles - **Adaptive TTLs** — cache durations are computed dynamically based on how "hot" an opportunity is (recently updated = shorter TTL = fresher data). -- **Background refresh** — a 30-second interval scans all open opportunities and re-fetches only expired cache keys. +- **Background refresh** — a 20-minute interval scans all open opportunities and re-fetches only expired cache keys. - **Bounded concurrency** — CW API calls are throttled via thunk-based batching to prevent overwhelming the upstream API. - **Graceful degradation** — transient CW errors (timeouts, network failures) are caught, logged, and retried on the next cycle rather than crashing the process. - **Priority ordering** — most recently updated opportunities are refreshed first so active deals get fresh data before stale ones. @@ -22,15 +22,21 @@ The API caches expensive ConnectWise (CW) API responses in **Redis** to reduce l Each non-closed opportunity can have up to 7 cached payloads in Redis: -| Cache Key Pattern | Data | Source | -|---|---|---| -| `opp:cw-data:{cwOpportunityId}` | Raw CW opportunity response | `GET /sales/opportunities/:id` | -| `opp:activities:{cwOpportunityId}` | CW activities array | `GET /sales/activities?conditions=opportunity/id=:id` | -| `opp:notes:{cwOpportunityId}` | CW notes array | `GET /sales/opportunities/:id/notes` | -| `opp:contacts:{cwOpportunityId}` | CW contacts array | `GET /sales/opportunities/:id/contacts` | -| `opp:products:{cwOpportunityId}` | Forecast + procurement products blob | `GET /sales/opportunities/:id/forecast` + `GET /procurement/products` | -| `opp:company-cw:{cw_CompanyId}` | Hydrated company + contacts blob | `GET /company/companies/:id` + contacts endpoints | -| `opp:site:{cwCompanyId}:{cwSiteId}` | Company site data | `GET /company/companies/:id/sites/:siteId` | +| Cache Key Pattern | Data | Source | +| ----------------------------------- | ------------------------------------ | --------------------------------------------------------------------- | +| `opp:cw-data:{cwOpportunityId}` | Raw CW opportunity response | `GET /sales/opportunities/:id` | +| `opp:activities:{cwOpportunityId}` | CW activities array | `GET /sales/activities?conditions=opportunity/id=:id` | +| `opp:notes:{cwOpportunityId}` | CW notes array | `GET /sales/opportunities/:id/notes` | +| `opp:contacts:{cwOpportunityId}` | CW contacts array | `GET /sales/opportunities/:id/contacts` | +| `opp:products:{cwOpportunityId}` | Forecast + procurement products blob | `GET /sales/opportunities/:id/forecast` + `GET /procurement/products` | +| `opp:company-cw:{cw_CompanyId}` | Hydrated company + contacts blob | `GET /company/companies/:id` + contacts endpoints | +| `opp:site:{cwCompanyId}:{cwSiteId}` | Company site data | `GET /company/companies/:id/sites/:siteId` | + +Inventory-adjustment-driven catalog sync adds a targeted product cache: + +| Cache Key Pattern | Data | Source | +| ------------------------ | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------- | +| `catalog:item:cw:{cwId}` | Full CW catalog item + computed `onHand` + DB row snapshot | `GET /procurement/adjustments` + `GET /procurement/catalog/:id` + catalog inventory endpoint | --- @@ -49,13 +55,13 @@ Three algorithms compute cache TTLs. All share the same input signals: Used for: opportunity CW data, activities, company CW data. -| # | Condition | TTL | Human | -|---|---|---|---| -| 1a | Closed > 30 days ago | `null` | Do not cache | -| 1b | Closed within 30 days | 900,000 ms | 15 minutes | -| 2 | `expectedCloseDate` or `lastUpdated` within **5 days** | 30,000 ms | 30 seconds | -| 3 | `expectedCloseDate` or `lastUpdated` within **14 days** | 60,000 ms | 60 seconds | -| 4 | Everything else | 900,000 ms | 15 minutes | +| # | Condition | TTL | Human | +| --- | ------------------------------------------------------- | ---------- | ------------ | +| 1a | Closed > 30 days ago | `null` | Do not cache | +| 1b | Closed within 30 days | 900,000 ms | 15 minutes | +| 2 | `expectedCloseDate` or `lastUpdated` within **5 days** | 30,000 ms | 30 seconds | +| 3 | `expectedCloseDate` or `lastUpdated` within **14 days** | 60,000 ms | 60 seconds | +| 4 | Everything else | 900,000 ms | 15 minutes | Rules are evaluated top-to-bottom; first match wins. @@ -65,13 +71,13 @@ Rules are evaluated top-to-bottom; first match wins. Used for: notes, contacts. -| # | Condition | TTL | Human | -|---|---|---|---| -| 1a | Closed > 30 days ago | `null` | Do not cache | -| 1b | Closed within 30 days | 300,000 ms | 5 minutes | -| 2 | Within **5 days** | 60,000 ms | 60 seconds | -| 3 | Within **14 days** | 120,000 ms | 2 minutes | -| 4 | Everything else | 300,000 ms | 5 minutes | +| # | Condition | TTL | Human | +| --- | --------------------- | ---------- | ------------ | +| 1a | Closed > 30 days ago | `null` | Do not cache | +| 1b | Closed within 30 days | 300,000 ms | 5 minutes | +| 2 | Within **5 days** | 60,000 ms | 60 seconds | +| 3 | Within **14 days** | 120,000 ms | 2 minutes | +| 4 | Everything else | 300,000 ms | 5 minutes | ### Products TTL (`computeProductsCacheTTL`) @@ -79,18 +85,18 @@ Used for: notes, contacts. Used for: forecast + procurement products. -| # | Condition | TTL | Human | -|---|---|---|---| -| 1 | Status is Won/Lost/Pending Won/Pending Lost | `null` | No cache | -| 2 | Main cache TTL is `null` | `null` | No cache | -| 3 | `lastUpdated` within **3 days** | 15,000 ms | 15 seconds | -| 4 | Everything else | 1,800,000 ms | 30 minutes | +| # | Condition | TTL | Human | +| --- | ------------------------------------------- | ------------ | ---------- | +| 1 | Status is Won/Lost/Pending Won/Pending Lost | `null` | No cache | +| 2 | Main cache TTL is `null` | `null` | No cache | +| 3 | `lastUpdated` within **3 days** | 15,000 ms | 15 seconds | +| 4 | Everything else | 1,200,000 ms | 20 minutes | -Products on terminal-status opportunities are never proactively cached. Non-hot products use a **lazy on-demand** cache — they're fetched when requested and cached for 30 minutes. +Products on terminal-status opportunities are never proactively cached. Non-hot products use a **lazy on-demand** cache — they're fetched when requested and cached for 20 minutes. ### Site TTL -Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely changes. Sites are **not** proactively warmed by the background refresh — they are populated lazily on the first detail-view request. +Sites use a fixed TTL of **20 minutes** (1,200,000 ms). Site/address data rarely changes. Sites are **not** proactively warmed by the background refresh — they are populated lazily on the first detail-view request. --- @@ -98,7 +104,7 @@ Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely **Function:** `refreshOpportunityCache()` in `src/modules/cache/opportunityCache.ts` -**Interval:** Every 30 seconds, triggered from `src/index.ts`. +**Interval:** Every 20 minutes, triggered from `src/index.ts`. ### Refresh cycle @@ -108,17 +114,61 @@ Sites use a fixed TTL of **30 minutes** (1,800,000 ms). Site/address data rarely 4. **Execute with bounded concurrency** — process thunks in batches of `CONCURRENCY` (currently **6**), with a `BATCH_DELAY_MS` (currently **250ms**) pause between batches. Each thunk is only invoked inside the batch loop. 5. **Emit events** — `cache:opportunities:refresh:started` and `cache:opportunities:refresh:completed` events are emitted for the event debugger. +### Inventory-adjustment listener cycle + +**Function:** `listenInventoryAdjustments()` in `src/modules/cw-utils/procurement/listenInventoryAdjustments.ts` + +**Interval:** Every 60 seconds, triggered from `src/index.ts`. + +1. Fetch `GET /procurement/adjustments?pageSize=1000`. +2. Build a normalized snapshot of tracked inventory rows (`cwCatalogId`, `onHand`, `inventory`) per adjustment. +3. Compare to previous snapshot; extract only changed product IDs. +4. For each changed product ID, fetch fresh CW catalog item + current on-hand. +5. Upsert `CatalogItem` in Postgres and write Redis key `catalog:item:cw:{cwId}` with a 20-minute TTL. + +Guardrails to prevent request storms: + +- Diffing is computed at **product state** level (grouped by `cwCatalogId`), not raw adjustment-row churn. +- Per-cycle syncs are capped (`CW_ADJUSTMENT_SYNC_MAX_PER_CYCLE`, default `50`). +- Product resync cooldown is enforced (`CW_ADJUSTMENT_SYNC_COOLDOWN_MS`, default `600000` ms / 10 min). + +This avoids full-catalog sweeps for small inventory movements and updates only the products implicated by adjustments. + +### Full procurement catalog refresh + +**Function:** `refreshCatalog()` in `src/modules/cw-utils/procurement/refreshCatalog.ts` + +**Interval:** Every 30 minutes, triggered from `src/index.ts`. + +The full catalog cache/DB sync uses the same slow-parallel thunk strategy as opportunity cache refreshes: + +- Build arrays of thunk tasks (`() => Promise`) for CW item fetches, inventory fetches, and DB upserts. +- Execute with bounded concurrency (`CONCURRENCY=6`). +- Pause between batches (`BATCH_DELAY_MS=250`) to avoid CW burst pressure. +- Log task failures and retry naturally on the next cycle. + +This keeps full-catalog refresh conservative while inventory-adjustment listener handles near-real-time targeted updates. + +### Full inventory sweep fallback + +`refreshInventory()` remains as a safety net but is intentionally infrequent: + +- Runs every **6 hours** from `src/index.ts` (no startup-time full sweep). +- Uses the same slow-parallel pattern (`CONCURRENCY=6`, `BATCH_DELAY_MS=250`) to avoid burst traffic. + +Most on-hand freshness now comes from the 60-second adjustment listener plus 30-minute full catalog refresh. + ### Concurrency control The thunk pattern is critical. Previously, tasks were pushed as already-executing promises (`refreshTasks.push(fetchAndCache(...))`), which meant all HTTP requests fired simultaneously regardless of the batching loop. The fix was changing the array type from `Promise[]` to `(() => Promise)[]` so requests only start when explicitly invoked: `batch.map((fn) => fn())`. ### Current tuning -| Parameter | Value | Effect | -|---|---|---| -| `CONCURRENCY` | 6 | Max simultaneous CW API requests per batch | -| `BATCH_DELAY_MS` | 250 | Milliseconds between batches | -| Refresh interval | 30 seconds | How often the full sweep runs | +| Parameter | Value | Effect | +| ---------------- | ---------- | ------------------------------------------ | +| `CONCURRENCY` | 6 | Max simultaneous CW API requests per batch | +| `BATCH_DELAY_MS` | 250 | Milliseconds between batches | +| Refresh interval | 20 minutes | How often the full sweep runs | At these settings, a full sweep of ~500 expired keys completes in ~1-2 minutes with zero CW errors and ~230ms median latency. @@ -142,11 +192,11 @@ Wraps CW API calls with exponential backoff retry on transient errors. ### Default configuration -| Parameter | Default | Description | -|---|---|---| -| `maxAttempts` | 3 | Total attempts including the first | -| `baseDelayMs` | 1,000 | Delay before first retry (doubles each retry: 1s → 2s → 4s) | -| `label` | — | Optional tag for log messages | +| Parameter | Default | Description | +| ------------- | ------- | ----------------------------------------------------------- | +| `maxAttempts` | 3 | Total attempts including the first | +| `baseDelayMs` | 1,000 | Delay before first retry (doubles each retry: 1s → 2s → 4s) | +| `label` | — | Optional tag for log messages | ### Usage @@ -181,16 +231,16 @@ LOG_CW_API=1 bun run dev ### Log entry fields -| Field | Type | Description | -|---|---|---| -| `timestamp` | string (ISO-8601) | When the request completed | -| `method` | string | HTTP method | -| `url` | string | Request URL (relative or absolute) | -| `baseURL` | string | Axios baseURL | -| `status` | number \| null | HTTP status (null on network error) | -| `durationMs` | number | Wall-clock time in milliseconds | -| `error` | string \| null | Error code + message, if any | -| `timeout` | number | Configured timeout in ms | +| Field | Type | Description | +| ------------ | ----------------- | ----------------------------------- | +| `timestamp` | string (ISO-8601) | When the request completed | +| `method` | string | HTTP method | +| `url` | string | Request URL (relative or absolute) | +| `baseURL` | string | Axios baseURL | +| `status` | number \| null | HTTP status (null on network error) | +| `durationMs` | number | Wall-clock time in milliseconds | +| `error` | string \| null | Error code + message, if any | +| `timeout` | number | Configured timeout in ms | ### Analysis @@ -229,12 +279,12 @@ rm -rf cw-api-logs/ Mutation endpoints invalidate the relevant cache keys so the next read fetches fresh data from CW: -| Mutation | Cache invalidated | -|---|---| -| Create/update/delete note | `opp:notes:{cwOpportunityId}` via `invalidateNotesCache()` | -| Create/update/delete contact | `opp:contacts:{cwOpportunityId}` via `invalidateContactsCache()` | +| Mutation | Cache invalidated | +| ------------------------------ | ---------------------------------------------------------------- | +| Create/update/delete note | `opp:notes:{cwOpportunityId}` via `invalidateNotesCache()` | +| Create/update/delete contact | `opp:contacts:{cwOpportunityId}` via `invalidateContactsCache()` | | Add/update/resequence products | `opp:products:{cwOpportunityId}` via `invalidateProductsCache()` | -| Refresh opportunity | All keys for that opportunity (via re-fetch) | +| Refresh opportunity | All keys for that opportunity (via re-fetch) | --- @@ -242,11 +292,11 @@ Mutation endpoints invalidate the relevant cache keys so the next read fetches f The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts`: -| Setting | Value | Purpose | -|---|---|---| -| `baseURL` | `https://ttscw.totaltech.net/v4_6_release/apis/3.0/` | CW API base | -| `timeout` | 30,000 ms (30s) | Per-request timeout | -| Logger | `attachCwApiLogger()` | Writes to `cw-api-calls.jsonl` | +| Setting | Value | Purpose | +| --------- | ---------------------------------------------------- | ------------------------------ | +| `baseURL` | `https://ttscw.totaltech.net/v4_6_release/apis/3.0/` | CW API base | +| `timeout` | 30,000 ms (30s) | Per-request timeout | +| Logger | `attachCwApiLogger()` | Writes to `cw-api-calls.jsonl` | --- @@ -255,7 +305,7 @@ The shared Axios instance (`connectWiseApi`) is configured in `src/constants.ts` ``` src/index.ts │ - ├─ setInterval(refreshOpportunityCache, 30s) + ├─ setInterval(refreshOpportunityCache, 20m) │ └─► src/modules/cache/opportunityCache.ts │ @@ -283,15 +333,16 @@ src/index.ts ## File reference -| File | Purpose | -|---|---| -| `src/modules/cache/opportunityCache.ts` | Cache read/write helpers, background refresh logic | -| `src/modules/algorithms/computeCacheTTL.ts` | Primary adaptive TTL algorithm | -| `src/modules/algorithms/computeSubResourceCacheTTL.ts` | Sub-resource (notes, contacts) TTL algorithm | -| `src/modules/algorithms/computeProductsCacheTTL.ts` | Products TTL algorithm | -| `src/modules/cw-utils/withCwRetry.ts` | Retry wrapper with exponential backoff | -| `src/modules/cw-utils/cwApiLogger.ts` | Axios interceptor for JSONL call logging | -| `src/modules/cw-utils/fetchCompany.ts` | Company fetch with retry | -| `src/constants.ts` | CW Axios instance config (timeout, logger) | -| `src/index.ts` | Refresh interval registration | -| `debug-scripts/analyze-cw-calls.py` | CW API call analysis script | +| File | Purpose | +| ---------------------------------------------------------------- | ------------------------------------------------------------- | +| `src/modules/cache/opportunityCache.ts` | Cache read/write helpers, background refresh logic | +| `src/modules/algorithms/computeCacheTTL.ts` | Primary adaptive TTL algorithm | +| `src/modules/algorithms/computeSubResourceCacheTTL.ts` | Sub-resource (notes, contacts) TTL algorithm | +| `src/modules/algorithms/computeProductsCacheTTL.ts` | Products TTL algorithm | +| `src/modules/cw-utils/withCwRetry.ts` | Retry wrapper with exponential backoff | +| `src/modules/cw-utils/cwApiLogger.ts` | Axios interceptor for JSONL call logging | +| `src/modules/cw-utils/fetchCompany.ts` | Company fetch with retry | +| `src/modules/cw-utils/procurement/listenInventoryAdjustments.ts` | Adjustment listener for targeted catalog-item cache + DB sync | +| `src/constants.ts` | CW Axios instance config (timeout, logger) | +| `src/index.ts` | Refresh interval registration | +| `debug-scripts/analyze-cw-calls.py` | CW API call analysis script | diff --git a/PERMISSIONS.md b/PERMISSIONS.md index 93bf921..b1b710b 100644 --- a/PERMISSIONS.md +++ b/PERMISSIONS.md @@ -124,6 +124,14 @@ Admin-specific UI permissions that control visibility and data loading for admin | `procurement.catalog.inventory.refresh` | Refresh on-hand inventory for a catalog item from ConnectWise | [src/api/procurement/[id]/refreshInventory.ts](src/api/procurement/[id]/refreshInventory.ts) | `procurement.catalog.fetch` | | `procurement.catalog.link` | Link or unlink catalog items to each other | [src/api/procurement/[id]/link.ts](src/api/procurement/[id]/link.ts), [src/api/procurement/[id]/unlink.ts](src/api/procurement/[id]/unlink.ts) | `procurement.catalog.fetch` | +### ConnectWise Callback Routes + +`POST /v1/cw/callback/:secret/:resource` is intentionally unauthenticated for inbound ConnectWise callbacks and does **not** require a permission node. + +| Permission Node | Description | Used In | Dependencies | +| --------------- | ------------------------------------------------------------------------------- | ------------------------------------------------ | ------------ | +| _None_ | Inbound callback route; secured operationally (network controls / source trust) | [src/api/cw/callback.ts](src/api/cw/callback.ts) | N/A | + ### Sales Permissions Permissions for accessing and managing sales opportunities. Opportunities are synced from ConnectWise and stored locally; sub-resources (products, notes, contacts) are fetched live from CW. diff --git a/debug-scripts/analyze_webhook_log.py b/debug-scripts/analyze_webhook_log.py new file mode 100644 index 0000000..e0d1038 --- /dev/null +++ b/debug-scripts/analyze_webhook_log.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 + +import argparse +import json +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +def parse_iso(value: str | None) -> datetime | None: + if not value: + return None + normalized = value.replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized) + except ValueError: + return None + + +def first_non_empty(*values: Any) -> str: + for value in values: + if value is None: + continue + if isinstance(value, str) and value.strip() == "": + continue + return str(value) + return "unknown" + + +def top_lines(counter: Counter[str], limit: int) -> list[str]: + return [f"{k}: {v}" for k, v in counter.most_common(limit)] + + +def fmt_pct(part: int, total: int) -> str: + if total == 0: + return "0.0%" + return f"{(part / total) * 100:.1f}%" + + +def human_duration(start: datetime | None, end: datetime | None) -> str: + if start is None or end is None: + return "unknown" + + delta = end - start + total_seconds = int(delta.total_seconds()) + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + return f"{hours}h {minutes}m {seconds}s" + + +def truncate(value: str, max_len: int = 90) -> str: + if len(value) <= max_len: + return value + return value[: max_len - 1] + "…" + + +def add_section(lines: list[str], title: str) -> None: + lines.append("") + lines.append(title) + lines.append("-" * len(title)) + + +def supports_color(enabled: bool) -> bool: + return enabled + + +def paint(text: str, code: str, use_color: bool) -> str: + if not use_color: + return text + return f"\033[{code}m{text}\033[0m" + + +def good_bad_neutral(value: str, state: str, use_color: bool) -> str: + if state == "good": + return paint(value, "32", use_color) + if state == "bad": + return paint(value, "31", use_color) + return paint(value, "36", use_color) + + +def add_ranked_counter( + lines: list[str], + title: str, + counter: Counter[str], + top_n: int, + total: int, + truncate_labels: bool = False, +) -> None: + lines.append(f"• {title}") + items = counter.most_common(top_n) + if not items: + lines.append(" (no data)") + return + + for index, (key, count) in enumerate(items, start=1): + label = truncate(key) if truncate_labels else key + lines.append(f" {index:>2}. {label:<90} {count:>4} {fmt_pct(count, total):>6}") + + +def stream_row_summary(row: dict[str, Any], use_color: bool, max_path: int) -> str: + request = row.get("request") or {} + response = row.get("response") or {} + body_parsed = request.get("bodyParsed") or {} + entity_parsed = request.get("entityParsed") or {} + summary = request.get("summary") or {} + + timestamp = parse_iso(row.get("timestamp")) + time_label = timestamp.astimezone(timezone.utc).strftime("%H:%M:%S") if timestamp else "--:--:--" + + method = first_non_empty(request.get("method")) + path = first_non_empty(request.get("path")) + endpoint = path.split("?", 1)[0] + status_code = first_non_empty(response.get("status")) + + event_type = first_non_empty(body_parsed.get("Type"), summary.get("type")) + action = first_non_empty( + body_parsed.get("Action"), + summary.get("action"), + request.get("query", {}).get("params", {}).get("action"), + ) + item_id = first_non_empty(body_parsed.get("ID"), summary.get("id"), request.get("query", {}).get("inferredId")) + actor = first_non_empty( + request.get("query", {}).get("params", {}).get("memberId"), + summary.get("entityUpdatedBy"), + entity_parsed.get("UpdatedBy"), + ) + entity_status = first_non_empty(entity_parsed.get("StatusName"), summary.get("entityStatus")) + + endpoint_label = truncate(endpoint, max_path) + status_state = "good" if status_code.startswith("2") else "bad" + status_colored = good_bad_neutral(status_code, status_state, use_color) + event_colored = paint(f"{event_type}.{action}", "36", use_color) + endpoint_colored = paint(endpoint_label, "94", use_color) + + return ( + f"[{time_label}] {method:<4} {endpoint_colored:<20} " + f"{status_colored:>3} {event_colored:<22} " + f"id={item_id:<7} actor={truncate(actor, 16):<16} status={truncate(entity_status, 22)}" + ) + + +def endpoint_stream_summary(log_path: Path, use_color: bool, max_path: int) -> str: + lines: list[str] = [] + lines.append(paint("ENDPOINT STREAM (chronological)", "1;95", use_color)) + lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color)) + + count = 0 + invalid = 0 + with log_path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + + try: + row = json.loads(line) + except json.JSONDecodeError: + invalid += 1 + continue + + lines.append(stream_row_summary(row, use_color=use_color, max_path=max_path)) + count += 1 + + lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color)) + lines.append( + f"events={count} invalid={good_bad_neutral(str(invalid), 'good' if invalid == 0 else 'bad', use_color)}" + ) + return "\n".join(lines) + + +@dataclass +class LogStats: + total_rows: int = 0 + parsed_rows: int = 0 + invalid_rows: int = 0 + earliest: datetime | None = None + latest: datetime | None = None + + methods: Counter[str] = None # type: ignore[assignment] + paths: Counter[str] = None # type: ignore[assignment] + endpoint_roots: Counter[str] = None # type: ignore[assignment] + response_statuses: Counter[str] = None # type: ignore[assignment] + event_types: Counter[str] = None # type: ignore[assignment] + actions: Counter[str] = None # type: ignore[assignment] + type_action_combo: Counter[str] = None # type: ignore[assignment] + company_ids: Counter[str] = None # type: ignore[assignment] + + source_members: Counter[str] = None # type: ignore[assignment] + actor_members: Counter[str] = None # type: ignore[assignment] + entity_updated_by: Counter[str] = None # type: ignore[assignment] + + requests_by_hour: Counter[str] = None # type: ignore[assignment] + requests_by_minute: Counter[str] = None # type: ignore[assignment] + endpoint_by_hour: dict[str, Counter[str]] = None # type: ignore[assignment] + + def __post_init__(self) -> None: + self.methods = Counter() + self.paths = Counter() + self.endpoint_roots = Counter() + self.response_statuses = Counter() + self.event_types = Counter() + self.actions = Counter() + self.type_action_combo = Counter() + self.company_ids = Counter() + + self.source_members = Counter() + self.actor_members = Counter() + self.entity_updated_by = Counter() + + self.requests_by_hour = Counter() + self.requests_by_minute = Counter() + self.endpoint_by_hour = defaultdict(Counter) + + def add_timestamp(self, timestamp: datetime | None) -> None: + if timestamp is None: + return + + self.earliest = timestamp if self.earliest is None else min(self.earliest, timestamp) + self.latest = timestamp if self.latest is None else max(self.latest, timestamp) + + hour_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC") + minute_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + self.requests_by_hour[hour_bucket] += 1 + self.requests_by_minute[minute_bucket] += 1 + + def summarize(self, top_n: int, busiest_n: int, use_color: bool) -> str: + duration_line = human_duration(self.earliest, self.latest) + time_range_line = "unknown" + if self.earliest and self.latest: + time_range_line = f"{self.earliest.isoformat()} → {self.latest.isoformat()}" + + total_requests = self.parsed_rows + success_count = self.response_statuses.get("200", 0) + success_pct = fmt_pct(success_count, sum(self.response_statuses.values())) + invalid_state = "good" if self.invalid_rows == 0 else "bad" + + top_endpoints = self.endpoint_roots.most_common(2) + top_users = self.actor_members.most_common(3) + top_minutes = self.requests_by_minute.most_common(busiest_n) + + lines: list[str] = [] + lines.append(paint("WEBHOOK SNAPSHOT", "1;95", use_color)) + lines.append(paint("────────────────────────────────────────────────────────", "90", use_color)) + lines.append( + " " + + paint("Rows", "1;97", use_color) + + f": {self.total_rows:<4} " + + paint("Parsed", "1;97", use_color) + + f": {self.parsed_rows:<4} " + + paint("Invalid", "1;97", use_color) + + f": {good_bad_neutral(str(self.invalid_rows), invalid_state, use_color)}" + ) + lines.append( + " " + + paint("Window", "1;97", use_color) + + f": {duration_line:<12} " + + paint("Success", "1;97", use_color) + + f": {good_bad_neutral(success_pct, 'good' if success_count else 'neutral', use_color)}" + ) + lines.append(" " + paint("UTC Range", "1;97", use_color) + f": {time_range_line}") + + lines.append("") + lines.append(paint("Top Endpoints", "1;94", use_color)) + if top_endpoints: + for endpoint, count in top_endpoints: + lines.append(f" • {endpoint:<14} {count:>4} ({fmt_pct(count, total_requests)})") + if not top_endpoints: + lines.append(" • (no data)") + + lines.append("") + lines.append(paint("Most Active Users (query memberId)", "1;94", use_color)) + if top_users: + for user, count in top_users: + lines.append(f" • {user:<18} {count:>4} ({fmt_pct(count, total_requests)})") + if not top_users: + lines.append(" • (no data)") + + lines.append("") + lines.append(paint("Busiest Minutes", "1;94", use_color)) + if top_minutes: + for minute, count in top_minutes: + lines.append(f" • {minute:<22} {count:>3}") + if not top_minutes: + lines.append(" • (no data)") + + lines.append("") + lines.append(paint("Request Mix", "1;94", use_color)) + method_line = ", ".join([f"{k}:{v}" for k, v in self.methods.most_common(3)]) or "(no data)" + event_line = ", ".join([f"{k}:{v}" for k, v in self.event_types.most_common(3)]) or "(no data)" + action_line = ", ".join([f"{k}:{v}" for k, v in self.actions.most_common(3)]) or "(no data)" + lines.append(f" • Methods : {method_line}") + lines.append(f" • Types : {event_line}") + lines.append(f" • Actions : {action_line}") + + lines.append("") + lines.append(paint("Status Codes", "1;94", use_color)) + if self.response_statuses: + status_total = sum(self.response_statuses.values()) + for status, count in self.response_statuses.most_common(5): + state = "good" if status.startswith("2") else "bad" + status_label = good_bad_neutral(status, state, use_color) + lines.append(f" • {status_label}: {count} ({fmt_pct(count, status_total)})") + if not self.response_statuses: + lines.append(" • (no data)") + + return "\n".join(lines) + + +def update_stats(stats: LogStats, row: dict[str, Any]) -> None: + timestamp = parse_iso(row.get("timestamp")) + stats.add_timestamp(timestamp) + + request = row.get("request") or {} + response = row.get("response") or {} + body_parsed = request.get("bodyParsed") or {} + entity_parsed = request.get("entityParsed") or {} + + method = first_non_empty(request.get("method")) + path = first_non_empty(request.get("path")) + endpoint_root = path.split("?", 1)[0] + status = first_non_empty(response.get("status")) + + event_type = first_non_empty( + body_parsed.get("Type"), + request.get("summary", {}).get("type"), + ) + action = first_non_empty( + body_parsed.get("Action"), + request.get("summary", {}).get("action"), + request.get("query", {}).get("params", {}).get("action"), + ) + combo = f"{event_type}:{action}" + + source_member = first_non_empty( + body_parsed.get("MemberId"), + request.get("summary", {}).get("memberId"), + ) + actor_member = first_non_empty( + request.get("query", {}).get("params", {}).get("memberId"), + request.get("summary", {}).get("entityUpdatedBy"), + ) + updated_by = first_non_empty( + entity_parsed.get("UpdatedBy"), + request.get("summary", {}).get("entityUpdatedBy"), + ) + company_id = first_non_empty(body_parsed.get("CompanyId"), request.get("headers", {}).get("companyname")) + + stats.methods[method] += 1 + stats.paths[path] += 1 + stats.endpoint_roots[endpoint_root] += 1 + stats.response_statuses[status] += 1 + stats.event_types[event_type] += 1 + stats.actions[action] += 1 + stats.type_action_combo[combo] += 1 + stats.company_ids[company_id] += 1 + + stats.source_members[source_member] += 1 + stats.actor_members[actor_member] += 1 + stats.entity_updated_by[updated_by] += 1 + + if timestamp: + bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC") + stats.endpoint_by_hour[endpoint_root][bucket] += 1 + + +def analyze_file(log_path: Path) -> LogStats: + stats = LogStats() + + with log_path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + + stats.total_rows += 1 + try: + row = json.loads(line) + except json.JSONDecodeError: + stats.invalid_rows += 1 + continue + + stats.parsed_rows += 1 + update_stats(stats, row) + + return stats + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Analyze webhook JSONL logs by users, time, and request types." + ) + parser.add_argument("log_file", help="Path to JSONL log file") + parser.add_argument("--top", type=int, default=10, help="Top N entries per section (default: 10)") + parser.add_argument( + "--busiest-minutes", + type=int, + default=5, + help="How many top minute buckets to show (default: 5)", + ) + parser.add_argument( + "--no-color", + action="store_true", + help="Disable ANSI colors", + ) + parser.add_argument( + "--endpoint-stream", + action="store_true", + help="Show chronological one-line summary per webhook, similar to live test webserver logs", + ) + parser.add_argument( + "--max-path", + type=int, + default=18, + help="Max endpoint width in stream mode before truncation (default: 18)", + ) + args = parser.parse_args() + + log_path = Path(args.log_file) + if not log_path.exists() or not log_path.is_file(): + raise SystemExit(f"Log file not found: {log_path}") + + use_color = supports_color(not args.no_color) + + if args.endpoint_stream: + print(endpoint_stream_summary(log_path, use_color=use_color, max_path=max(args.max_path, 10))) + return + + stats = analyze_file(log_path) + print( + stats.summarize( + top_n=max(args.top, 1), + busiest_n=max(args.busiest_minutes, 1), + use_color=use_color, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/package.json b/package.json index 1aaa81e..3890201 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,8 @@ "utils:gen_private_keys": "bun ./utils/genPrivateKeys", "utils:create_admin_role": "bun ./utils/createAdminRole", "utils:assign_user_role": "bun ./utils/assignUserRole", + "utils:test_webserver": "bun ./utils/testWebserver.ts", + "utils:test_adjustments_poll": "bun ./utils/testAdjustmentsPoll.ts", "utils:analyze_cw": "python3 debug-scripts/analyze-cw-calls.py", "db:check": "bunx prisma migrate diff --from-migrations prisma/migrations --to-schema prisma/schema.prisma --shadow-database-url $DATABASE_URL --exit-code" }, diff --git a/src/api/cw/callback.ts b/src/api/cw/callback.ts new file mode 100644 index 0000000..7cc35bb --- /dev/null +++ b/src/api/cw/callback.ts @@ -0,0 +1,164 @@ +import { createRoute } from "../../modules/api-utils/createRoute"; +import { apiResponse } from "../../modules/api-utils/apiResponse"; +import { ContentfulStatusCode } from "hono/utils/http-status"; +import { z } from "zod"; +import GenericError from "../../Errors/GenericError"; + +type ParsedJson = Record | unknown[]; + +const callbackResource = z.enum([ + "opportunity", + "ticket", + "company", + "activity", +]); + +const safeParseJson = (value: string): ParsedJson | null => { + try { + const parsed = JSON.parse(value); + const isObject = typeof parsed === "object" && parsed !== null; + + return isObject ? (parsed as ParsedJson) : null; + } catch { + return null; + } +}; + +const asObject = (value: ParsedJson | null): Record | null => { + if (!value) return null; + if (Array.isArray(value)) return null; + + return value; +}; + +const parseJsonStringFields = ( + value: Record | null, +): Record | null => { + if (!value) return null; + + return Object.entries(value).reduce>( + (acc, [key, current]) => { + if (typeof current !== "string") { + acc[key] = current; + + return acc; + } + + const looksLikeJson = current.startsWith("{") || current.startsWith("["); + if (!looksLikeJson) { + acc[key] = current; + + return acc; + } + + const parsed = safeParseJson(current); + acc[key] = parsed ?? current; + + return acc; + }, + {}, + ); +}; + +const parseEntity = (value: unknown): ParsedJson | null => { + if (typeof value === "string") return safeParseJson(value); + if (typeof value !== "object" || value === null) return null; + + return value as ParsedJson; +}; + +const buildSummary = ( + resource: z.infer, + parsedBody: Record | null, + parsedEntity: Record | null, +) => { + if (!parsedBody) return null; + + return { + resource, + messageId: parsedBody.MessageId ?? null, + action: parsedBody.Action ?? null, + type: parsedBody.Type ?? null, + id: parsedBody.ID ?? null, + memberId: parsedBody.MemberId ?? null, + entityStatus: + parsedEntity?.StatusName ?? + parsedEntity?.TicketStatus ?? + parsedEntity?.Status ?? + null, + entitySummary: parsedEntity?.Summary ?? parsedEntity?.CompanyName ?? null, + entityUpdatedBy: parsedEntity?.UpdatedBy ?? null, + entityLastUpdated: + parsedEntity?.LastUpdatedUTC ?? parsedEntity?.LastUpdated ?? null, + }; +}; + +const parseHeaders = (headers: Headers): Record => + Object.fromEntries(headers.entries()); + +const callbackHeaderSummary = (headers: Record) => ({ + contentType: headers["content-type"] ?? null, + userAgent: headers["user-agent"] ?? null, + host: headers.host ?? null, + forwardedFor: headers["x-forwarded-for"] ?? null, + callbackId: + headers["x-cw-request-id"] ?? + headers["x-request-id"] ?? + headers["x-correlation-id"] ?? + null, +}); + +/* /v1/cw/callback/:resource */ +export default createRoute("post", ["/callback/:secret/:resource"], async (c) => { + const suppliedSecret = c.req.param("secret"); + const expectedSecret = process.env.CW_CALLBACK_SECRET; + + if (expectedSecret && suppliedSecret !== expectedSecret) { + throw new GenericError({ + name: "Unauthorized", + message: "Invalid callback secret.", + cause: "Path secret mismatch", + status: 401, + }); + } + + if (!expectedSecret) { + console.warn( + "[cw-callback] CW_CALLBACK_SECRET is not configured; accepting path secret without verification", + ); + } + + const resource = callbackResource.parse(c.req.param("resource")); + const headers = parseHeaders(c.req.raw.headers); + const headerSummary = callbackHeaderSummary(headers); + const rawBody = await c.req.text(); + const parsedJson = safeParseJson(rawBody); + const parsedBody = asObject(parsedJson); + const parsedBodyExpanded = parseJsonStringFields(parsedBody); + const parsedEntity = asObject(parseEntity(parsedBodyExpanded?.Entity)); + const summary = buildSummary(resource, parsedBodyExpanded, parsedEntity); + + const line = [ + `[cw-callback] resource=${resource}`, + `action=${String(summary?.action ?? "-")}`, + `type=${String(summary?.type ?? "-")}`, + `id=${String(summary?.id ?? "-")}`, + `by=${String(summary?.entityUpdatedBy ?? summary?.memberId ?? "-")}`, + `requestId=${String(headerSummary.callbackId ?? "-")}`, + `status=${String(summary?.entityStatus ?? "-")}`, + `summary=${String(summary?.entitySummary ?? "-")}`, + ].join(" "); + console.log(line); + + const response = apiResponse.successful("CW callback received.", { + resource, + secretValidated: Boolean(expectedSecret), + summary, + headers, + headerSummary, + bodyParsed: parsedBodyExpanded, + receivedAt: new Date().toISOString(), + }); + + return c.json(response, response.status as ContentfulStatusCode); +}); diff --git a/src/api/cw/index.ts b/src/api/cw/index.ts new file mode 100644 index 0000000..924e894 --- /dev/null +++ b/src/api/cw/index.ts @@ -0,0 +1,3 @@ +import { default as callback } from "./callback"; + +export { callback }; diff --git a/src/api/routers/cwRouter.ts b/src/api/routers/cwRouter.ts new file mode 100644 index 0000000..2ddae57 --- /dev/null +++ b/src/api/routers/cwRouter.ts @@ -0,0 +1,7 @@ +import { Hono } from "hono"; +import * as cwRoutes from "../cw"; + +const cwRouter = new Hono(); +Object.values(cwRoutes).map((r) => cwRouter.route("/", r)); + +export default cwRouter; diff --git a/src/api/sales/[id]/fetch.ts b/src/api/sales/[id]/fetch.ts index 4f300ff..18f5ee0 100644 --- a/src/api/sales/[id]/fetch.ts +++ b/src/api/sales/[id]/fetch.ts @@ -4,12 +4,27 @@ import { apiResponse } from "../../../modules/api-utils/apiResponse"; import { ContentfulStatusCode } from "hono/utils/http-status"; import { authMiddleware } from "../../middleware/authorization"; import { processObjectValuePerms } from "../../../modules/permission-utils/processObjectPermissions"; +import GenericError from "../../../Errors/GenericError"; +import { prisma } from "../../../constants"; +import { computeSubResourceCacheTTL } from "../../../modules/algorithms/computeSubResourceCacheTTL"; +import { computeProductsCacheTTL } from "../../../modules/algorithms/computeProductsCacheTTL"; +import { + getCachedSite, + getCachedNotes, + getCachedContacts, + getCachedProducts, + fetchAndCacheNotes, + fetchAndCacheContacts, + fetchAndCacheProducts, + fetchAndCacheSite, +} from "../../../modules/cache/opportunityCache"; /* GET /v1/sales/opportunities/:identifier?include=notes,contacts,products */ export default createRoute( "get", ["/opportunities/:identifier"], async (c) => { + const t0 = performance.now(); const identifier = c.req.param("identifier"); const includeParam = c.req.query("include") ?? ""; const includes = new Set( @@ -19,46 +34,175 @@ export default createRoute( .filter(Boolean), ); - const item = await opportunities.fetchItem(identifier); + // ── Quick DB lookup (≈3ms) to get cwOpportunityId for pre-warming ── + const isNumeric = /^\d+$/.test(identifier); + const dbRecord = await prisma.opportunity.findFirst({ + where: isNumeric + ? { cwOpportunityId: Number(identifier) } + : { id: identifier }, + select: { + cwOpportunityId: true, + companyCwId: true, + siteCwId: true, + closedFlag: true, + closedDate: true, + expectedCloseDate: true, + cwLastUpdated: true, + statusCwId: true, + }, + }); - // Eagerly load site data so toJson() includes full site info - await item.fetchSite(); + if (!dbRecord) { + throw new GenericError({ + message: "Opportunity not found", + name: "OpportunityNotFound", + cause: `No opportunity exists with identifier '${identifier}'`, + status: 404, + }); + } + // Compute TTLs from DB state + const subTtl = computeSubResourceCacheTTL({ + closedFlag: dbRecord.closedFlag, + closedDate: dbRecord.closedDate, + expectedCloseDate: dbRecord.expectedCloseDate, + lastUpdated: dbRecord.cwLastUpdated, + }); + const prodTtl = computeProductsCacheTTL({ + closedFlag: dbRecord.closedFlag, + closedDate: dbRecord.closedDate, + expectedCloseDate: dbRecord.expectedCloseDate, + lastUpdated: dbRecord.cwLastUpdated, + statusCwId: dbRecord.statusCwId, + }); + + // ── Pre-warm sub-resources only on cache miss ─────────────────────── + // Check Redis first — if the background refresh has kept the keys warm, + // skip the CW calls entirely. Only fetch-and-cache on a miss. + const cwOppId = dbRecord.cwOpportunityId; + const _pw0 = performance.now(); + const _wrapPw = (label: string, p: Promise) => + p + .then((r) => { + console.log( + `[PERF:prewarm] ${label}: ${(performance.now() - _pw0).toFixed(0)}ms`, + ); + return r; + }) + .catch(() => {}); + + const prewarmPromises: Promise[] = []; + if (dbRecord.companyCwId && dbRecord.siteCwId) { + const compId = dbRecord.companyCwId, + siteId = dbRecord.siteCwId; + prewarmPromises.push( + _wrapPw( + "site", + getCachedSite(compId, siteId).then( + (c) => c ?? fetchAndCacheSite(compId, siteId), + ), + ), + ); + } + if (includes.has("notes") && subTtl) + prewarmPromises.push( + _wrapPw( + "notes", + getCachedNotes(cwOppId).then( + (c) => c ?? fetchAndCacheNotes(cwOppId, subTtl), + ), + ), + ); + if (includes.has("contacts") && subTtl) + prewarmPromises.push( + _wrapPw( + "contacts", + getCachedContacts(cwOppId).then( + (c) => c ?? fetchAndCacheContacts(cwOppId, subTtl), + ), + ), + ); + if (includes.has("products") && prodTtl) + prewarmPromises.push( + _wrapPw( + "products", + getCachedProducts(cwOppId).then( + (c) => c ?? fetchAndCacheProducts(cwOppId, prodTtl), + ), + ), + ); + + // fetchItem runs its own CW calls (opp, activities, company) — + // these execute concurrently with the sub-resource pre-warming above. + const [item] = await Promise.all([ + opportunities.fetchItem(identifier), + ...prewarmPromises, + ]); + const t1 = performance.now(); + console.log(`[PERF] fetchItem + prewarm: ${(t1 - t0).toFixed(0)}ms`); + + // Sub-resources now hit warm Redis cache (near-instant) + const _st = performance.now(); + const _wrapTimed = (label: string, p: Promise) => + p.then((r) => { + console.log( + `[PERF:sub] ${label}: ${(performance.now() - _st).toFixed(0)}ms`, + ); + return r; + }); + + const subResourcePromises: Record> = { + _site: _wrapTimed("site", item.fetchSite()), + }; + if (includes.has("notes")) { + subResourcePromises.notes = _wrapTimed("notes", item.fetchNotes()); + } + if (includes.has("contacts")) { + subResourcePromises.contacts = _wrapTimed( + "contacts", + item.fetchContacts(), + ); + } + if (includes.has("products")) { + subResourcePromises.products = _wrapTimed( + "products", + item + .fetchProducts() + .then((products) => products.map((p) => p.toJson())), + ); + } + + const keys = Object.keys(subResourcePromises); + const results = await Promise.all(keys.map((k) => subResourcePromises[k])); + const t2 = performance.now(); + console.log( + `[PERF] sub-resources (${keys.join(",")}): ${(t2 - t1).toFixed(0)}ms`, + ); + + // Apply toJson after site is hydrated (side-effect from fetchSite) const gatedData = await processObjectValuePerms( item.toJson(), "obj.opportunity", c.get("user"), ); + const t3 = performance.now(); + console.log(`[PERF] processObjectValuePerms: ${(t3 - t2).toFixed(0)}ms`); - // Fetch requested sub-resources in parallel - const subResourcePromises: Record> = {}; - if (includes.has("notes")) { - subResourcePromises.notes = item.fetchNotes(); - } - if (includes.has("contacts")) { - subResourcePromises.contacts = item.fetchContacts(); - } - if (includes.has("products")) { - subResourcePromises.products = item - .fetchProducts() - .then((products) => products.map((p) => p.toJson())); - } - - const keys = Object.keys(subResourcePromises); - if (keys.length > 0) { - const results = await Promise.all( - keys.map((k) => subResourcePromises[k]), - ); - keys.forEach((k, i) => { + // Attach sub-resources (skip the internal _site key) + keys.forEach((k, i) => { + if (k !== "_site") { (gatedData as any)[k] = results[i]; - }); - } + } + }); const response = apiResponse.successful( "Opportunity fetched successfully!", gatedData, ); + console.log( + `[PERF] total handler: ${(performance.now() - t0).toFixed(0)}ms (includes=${includeParam || "none"})`, + ); return c.json(response, response.status as ContentfulStatusCode); }, authMiddleware({ permissions: ["sales.opportunity.fetch"] }), diff --git a/src/api/server.ts b/src/api/server.ts index 0153666..9c8d3cf 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -57,6 +57,7 @@ v1.route("/permissions", require("./routers/permissionRouter").default); v1.route("/unifi", require("./routers/unifiRouter").default); v1.route("/procurement", require("./routers/procurementRouter").default); v1.route("/sales", require("./routers/salesRouter").default); +v1.route("/cw", require("./routers/cwRouter").default); app.route("/v1", v1); export default app; diff --git a/src/controllers/CompanyController.ts b/src/controllers/CompanyController.ts index 50f9b7c..4b5342b 100644 --- a/src/controllers/CompanyController.ts +++ b/src/controllers/CompanyController.ts @@ -50,18 +50,21 @@ export class CompanyController { const cwCompany = await fetchCwCompanyById(this.cw_CompanyId); if (!cwCompany) return this; - const contactHref = cwCompany.defaultContact?._info?.contact_href; - const defaultContactData = contactHref - ? await connectWiseApi.get(contactHref) - : undefined; - const allContactsData = await connectWiseApi.get( `${cwCompany._info.contacts_href}&pageSize=1000`, ); + // Derive default contact from allContacts instead of a separate CW call + const defaultContactId = cwCompany.defaultContact?.id; + const defaultContactData = defaultContactId + ? ((allContactsData.data as any[]).find( + (c: any) => c.id === defaultContactId, + ) ?? null) + : null; + this.cw_Data = { company: cwCompany, - defaultContact: defaultContactData?.data ?? null, + defaultContact: defaultContactData, allContacts: allContactsData.data, }; diff --git a/src/controllers/OpportunityController.ts b/src/controllers/OpportunityController.ts index 300f5ec..385ae4e 100644 --- a/src/controllers/OpportunityController.ts +++ b/src/controllers/OpportunityController.ts @@ -15,7 +15,10 @@ import { CWOpportunity, CWOpportunityNote, } from "../modules/cw-utils/opportunities/opportunity.types"; -import { resolveMember } from "../modules/cw-utils/members/memberCache"; +import { + resolveMember, + resolveMembers, +} from "../modules/cw-utils/members/memberCache"; import { ForecastProductController } from "./ForecastProductController"; import GenericError from "../Errors/GenericError"; import { computeSubResourceCacheTTL } from "../modules/algorithms/computeSubResourceCacheTTL"; @@ -429,18 +432,25 @@ export class OpportunityController { /** Serialize raw CW note data into the API response shape. */ private async _serializeNotes(notes: any[]) { - return Promise.all( - notes.map(async (n: any) => ({ - id: n.id, - text: n.text, - type: n.type ? { id: n.type.id, name: n.type.name } : null, - flagged: n.flagged, - dateEntered: n._info?.lastUpdated - ? new Date(n._info.lastUpdated) - : null, - enteredBy: await resolveMember(n.enteredBy), - })), - ); + // Batch-resolve all member identifiers in a single DB query + const identifiers = notes + .map((n: any) => n.enteredBy as string) + .filter(Boolean); + const memberMap = await resolveMembers(identifiers); + + return notes.map((n: any) => ({ + id: n.id, + text: n.text, + type: n.type ? { id: n.type.id, name: n.type.name } : null, + flagged: n.flagged, + dateEntered: n._info?.lastUpdated ? new Date(n._info.lastUpdated) : null, + enteredBy: memberMap.get(n.enteredBy) ?? { + id: null, + identifier: n.enteredBy, + name: n.enteredBy, + cwMemberId: null, + }, + })); } /** diff --git a/src/index.ts b/src/index.ts index 8f83cc5..477b9bd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { unifiSites } from "./managers/unifiSites"; import { refreshCompanies } from "./modules/cw-utils/refreshCompanies"; import { refreshCatalog } from "./modules/cw-utils/procurement/refreshCatalog"; import { refreshInventory } from "./modules/cw-utils/procurement/refreshInventory"; +import { listenInventoryAdjustments } from "./modules/cw-utils/procurement/listenInventoryAdjustments"; import { refreshOpportunities } from "./modules/cw-utils/opportunities/refreshOpportunities"; import { refreshOpportunityCache } from "./modules/cache/opportunityCache"; import { refreshCwIdentifiers } from "./modules/cw-utils/members/refreshCwIdentifiers"; @@ -105,25 +106,37 @@ setInterval(() => { ); }, 60 * 1000); -// Refresh the internal catalog every minute +// Refresh the internal catalog every 30 minutes await safeStartup("refreshCatalog", refreshCatalog); -setInterval(() => { - return refreshCatalog().catch((err) => - console.error(`[interval] refreshCatalog failed: ${briefErr(err)}`), - ); -}, 60 * 1000); +setInterval( + () => { + return refreshCatalog().catch((err) => + console.error(`[interval] refreshCatalog failed: ${briefErr(err)}`), + ); + }, + 30 * 60 * 1000, +); -// Refresh inventory on hand every 2 minutes -await safeStartup("refreshInventory", refreshInventory); +// Fallback full inventory sweep every 6 hours (listener handles real-time deltas) setInterval( () => { return refreshInventory().catch((err) => console.error(`[interval] refreshInventory failed: ${briefErr(err)}`), ); }, - 2 * 60 * 1000, + 6 * 60 * 60 * 1000, ); +// Listen for procurement adjustment changes and sync changed products to DB + cache +await safeStartup("listenInventoryAdjustments", listenInventoryAdjustments); +setInterval(() => { + return listenInventoryAdjustments().catch((err) => + console.error( + `[interval] listenInventoryAdjustments failed: ${briefErr(err)}`, + ), + ); +}, 60 * 1000); + // Refresh opportunities every minute await safeStartup("refreshOpportunities", refreshOpportunities); setInterval(() => { @@ -132,7 +145,7 @@ setInterval(() => { ); }, 60 * 1000); -// Refresh opportunity CW cache every 30 seconds (activities + company hydration) +// Refresh opportunity CW cache every 20 minutes (activities + company hydration) // NOTE: Do NOT await — register the interval immediately so the cache refresh // is never blocked by a slow/stuck startup task above. safeStartup("refreshOpportunityCache", refreshOpportunityCache); @@ -142,7 +155,7 @@ setInterval(() => { `[interval] refreshOpportunityCache failed: ${briefErr(err)}`, ); }); -}, 30 * 1000); +}, 20 * 60 * 1000); // Refresh User Defined Fields every 5 minutes await safeStartup("refreshUDFs", () => userDefinedFieldsCw.refresh()); diff --git a/src/managers/opportunities.ts b/src/managers/opportunities.ts index 5b5a8dc..60b05c0 100644 --- a/src/managers/opportunities.ts +++ b/src/managers/opportunities.ts @@ -42,16 +42,22 @@ async function buildCompanyController( ttlMs?: number; }, ): Promise { + const _ct0 = performance.now(); const strategy = opts?.strategy ?? "cache-then-cw"; const ctrl = new CompanyController(company); - // ── cw-first: always fetch from CW ────────────────────────────────── + // ── cw-first: always fetch from CW (and cache the result) ────────── if (strategy === "cw-first") { - await ctrl.hydrateCwData(); - if (ctrl.cw_Data && opts?.ttlMs) { - await fetchAndCacheCompanyCwData(company.cw_CompanyId, opts.ttlMs).catch( - () => {}, - ); + const blob = opts?.ttlMs + ? await fetchAndCacheCompanyCwData( + company.cw_CompanyId, + opts.ttlMs, + ).catch(() => null) + : null; + if (blob) { + ctrl.cw_Data = blob; + } else { + await ctrl.hydrateCwData(); } return ctrl; } @@ -66,14 +72,20 @@ async function buildCompanyController( // cache-only stops here — return the bare DB-backed controller if (strategy === "cache-only") return ctrl; - // cache-then-cw: cache miss — fall through to CW - await ctrl.hydrateCwData(); - if (ctrl.cw_Data && opts?.ttlMs) { - await fetchAndCacheCompanyCwData(company.cw_CompanyId, opts.ttlMs).catch( - () => {}, - ); + // cache-then-cw: cache miss — fetch from CW once and cache in one pass + if (opts?.ttlMs) { + const blob = await fetchAndCacheCompanyCwData( + company.cw_CompanyId, + opts.ttlMs, + ).catch(() => null); + if (blob) ctrl.cw_Data = blob; + } else { + await ctrl.hydrateCwData(); } + console.log( + `[PERF:buildCompany] ${(performance.now() - _ct0).toFixed(0)}ms (strategy=${strategy}, hit=miss)`, + ); return ctrl; } @@ -93,17 +105,14 @@ async function buildActivities( ttlMs?: number; }, ): Promise { + const _at0 = performance.now(); const strategy = opts?.strategy ?? "cache-then-cw"; - // ── cw-first: always fetch from CW ────────────────────────────────── + // ── cw-first: always fetch from CW (and cache the result) ────────── if (strategy === "cw-first") { - const collection = await activityCw.fetchByOpportunity(cwOpportunityId); - const arr = collection.map((item) => item); - if (opts?.ttlMs) { - await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs).catch( - () => {}, - ); - } + const arr = opts?.ttlMs + ? await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs) + : await activityCw.fetchByOpportunityDirect(cwOpportunityId); return arr.map((item) => new ActivityController(item)); } @@ -116,12 +125,13 @@ async function buildActivities( // cache-only stops here — return empty (background job will fill it) if (strategy === "cache-only") return []; - // cache-then-cw: cache miss — fall through to CW - const collection = await activityCw.fetchByOpportunity(cwOpportunityId); - const arr = collection.map((item) => item); - if (opts?.ttlMs) { - await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs).catch(() => {}); - } + // cache-then-cw: cache miss — fetch once and cache in one pass + const arr = opts?.ttlMs + ? await fetchAndCacheActivities(cwOpportunityId, opts.ttlMs) + : await activityCw.fetchByOpportunityDirect(cwOpportunityId); + console.log( + `[PERF:buildActivities] ${(performance.now() - _at0).toFixed(0)}ms (strategy=${strategy}, hit=miss, count=${arr.length})`, + ); return arr.map((item) => new ActivityController(item)); } @@ -192,6 +202,7 @@ export const opportunities = { identifier: string | number, opts?: { fresh?: boolean }, ): Promise { + const _t0 = performance.now(); const strategy: "cache-only" | "cache-then-cw" | "cw-first" = opts?.fresh ? "cw-first" : "cache-then-cw"; @@ -205,6 +216,8 @@ export const opportunities = { : { id: identifier as string }, include: { company: true }, }); + const _t1 = performance.now(); + console.log(`[PERF:fetchItem] DB lookup: ${(_t1 - _t0).toFixed(0)}ms`); if (!existing) { throw new GenericError({ @@ -232,12 +245,26 @@ export const opportunities = { // Try the Redis cache first cwData = await getCachedOppCwData(existing.cwOpportunityId); } + const _t2 = performance.now(); + console.log( + `[PERF:fetchItem] Redis cache check: ${(_t2 - _t1).toFixed(0)}ms (hit=${!!cwData})`, + ); + + // ── Parallel block: CW opp fetch + activities + company ──────────── + // Activities and company hydration only need existing.cwOpportunityId + // and existing.company — both available from the initial DB lookup — + // so they can run concurrently with the CW opp fetch + DB update. + + const cwOppPromise = (async () => { + if (cwData) return; // cache hit — nothing to do - if (!cwData) { - // Cache miss or forced fresh — fetch from CW and cache cwData = ttlMs ? await fetchAndCacheOppCwData(existing.cwOpportunityId, ttlMs) : await opportunityCw.fetch(existing.cwOpportunityId); + const _t2b = performance.now(); + console.log( + `[PERF:fetchItem] CW opp fetch: ${(_t2b - _t2).toFixed(0)}ms`, + ); if (!cwData) { throw new GenericError({ @@ -264,15 +291,27 @@ export const opportunities = { data: { ...mapped, companyId }, include: { company: true }, }); - } + console.log( + `[PERF:fetchItem] DB update: ${(performance.now() - _t2b).toFixed(0)}ms`, + ); + })(); - // Hydrate activities and company in parallel - const [activities, company] = await Promise.all([ - buildActivities(record.cwOpportunityId, { strategy, ttlMs }), - record.company - ? buildCompanyController(record.company, { strategy, ttlMs }) + const _t3 = performance.now(); + // Hydrate activities and company in parallel with CW opp fetch + const [, activities, company] = await Promise.all([ + cwOppPromise, + buildActivities(existing.cwOpportunityId, { strategy, ttlMs }), + existing.company + ? buildCompanyController(existing.company, { strategy, ttlMs }) : Promise.resolve(undefined), ]); + const _t4 = performance.now(); + console.log( + `[PERF:fetchItem] parallel block (cw+activities+company): ${(_t4 - _t3).toFixed(0)}ms`, + ); + console.log( + `[PERF:fetchItem] TOTAL: ${(_t4 - _t0).toFixed(0)}ms (strategy=${strategy}, ttl=${ttlMs}ms)`, + ); return new OpportunityController(record, { company, diff --git a/src/modules/algorithms/computeCacheTTL.ts b/src/modules/algorithms/computeCacheTTL.ts index 636dcf4..63d06f5 100644 --- a/src/modules/algorithms/computeCacheTTL.ts +++ b/src/modules/algorithms/computeCacheTTL.ts @@ -16,8 +16,8 @@ * | # | Condition | TTL (ms) | TTL (human) | Rationale | * |---|------------------------------------------------------------------|----------|-------------|--------------------------------------------------------------------| * | 1 | `closedFlag` is `true` | `null` | Do not cache| Closed records are rarely accessed; caching wastes memory. | - * | 2 | `expectedCloseDate` OR `lastUpdated` is within the last **5 days**| 30 000 | 30 seconds | High-activity window — data changes frequently and must stay fresh.| - * | 3 | `expectedCloseDate` OR `lastUpdated` is within the last **14 days**| 60 000 | 60 seconds | Moderate activity — still relevant, but changes less often. | + * | 2 | `expectedCloseDate` OR `lastUpdated` is within the last **5 days**| 60 000 | 60 seconds | High-activity window — data changes frequently and must stay fresh.| + * | 3 | `expectedCloseDate` OR `lastUpdated` is within the last **14 days**| 90 000 | 90 seconds | Moderate activity — still relevant, but changes less often. | * | 4 | Everything else (older than 14 days) | 900 000 | 15 minutes | Low activity — safe to serve from cache for longer. | * * ## Evaluation order @@ -62,11 +62,13 @@ // Constants // --------------------------------------------------------------------------- -/** 30 seconds – TTL for high-activity records (within 5 days). */ -export const TTL_HIGH_ACTIVITY = 30_000; +/** 60 seconds – TTL for high-activity records (within 5 days). + * Must exceed the 30-second background refresh interval so the cache + * stays warm between cycles. */ +export const TTL_HIGH_ACTIVITY = 60_000; -/** 60 seconds – TTL for moderate-activity records (within 14 days). */ -export const TTL_MODERATE_ACTIVITY = 60_000; +/** 90 seconds – TTL for moderate-activity records (within 14 days). */ +export const TTL_MODERATE_ACTIVITY = 90_000; /** 15 minutes – TTL for low-activity / stale records. */ export const TTL_LOW_ACTIVITY = 900_000; diff --git a/src/modules/algorithms/computeProductsCacheTTL.ts b/src/modules/algorithms/computeProductsCacheTTL.ts index 2d95bcf..b75b3cb 100644 --- a/src/modules/algorithms/computeProductsCacheTTL.ts +++ b/src/modules/algorithms/computeProductsCacheTTL.ts @@ -18,7 +18,7 @@ * | 1 | Status is **Won**, **Lost**, **Pending Won**, or **Pending Lost** | `null` | No cache | Products on terminal / near-terminal opps are static; no need to keep them warm. | * | 2 | Opportunity is **not cacheable** (main cache TTL is `null`) | `null` | No cache | If the opp itself is evicted, sub-resources follow suit. | * | 3 | `lastUpdated` is within the last **3 days** | 15 000 | 15 seconds | Actively-worked deals — products are being edited and need near-real-time freshness. | - * | 4 | Everything else | 1 800 000 | 30 minutes | Lazy on-demand cache: fetched when requested, expires after 30 min without refresh. | + * | 4 | Everything else | 1 200 000 | 20 minutes | Lazy on-demand cache: fetched when requested, expires after 20 min without refresh. | * * ## Evaluation order * @@ -44,11 +44,13 @@ import { QUOTE_STATUSES } from "../../types/QuoteStatuses"; // Constants // --------------------------------------------------------------------------- -/** 15 seconds — TTL for hot products (opportunity updated within 3 days). */ -export const PRODUCTS_TTL_HOT = 15_000; +/** 45 seconds — TTL for hot products (opportunity updated within 3 days). + * Must exceed the 30-second background refresh interval so the cache + * stays warm between cycles. */ +export const PRODUCTS_TTL_HOT = 45_000; -/** 30 minutes — TTL for on-demand product cache (lazy fallback). */ -export const PRODUCTS_TTL_LAZY = 1_800_000; +/** 20 minutes — TTL for on-demand product cache (lazy fallback). */ +export const PRODUCTS_TTL_LAZY = 1_200_000; /** 3 days in milliseconds. */ const THREE_DAYS_MS = 3 * 24 * 60 * 60 * 1000; diff --git a/src/modules/cache/opportunityCache.ts b/src/modules/cache/opportunityCache.ts index e8214c9..1d76a65 100644 --- a/src/modules/cache/opportunityCache.ts +++ b/src/modules/cache/opportunityCache.ts @@ -310,25 +310,34 @@ export async function fetchAndCacheCompanyCwData( ttlMs: number, ): Promise<{ company: any; defaultContact: any; allContacts: any[] } | null> { try { - const cwCompany = await fetchCwCompanyById(cwCompanyId); + // Fetch company and all-contacts in parallel — the allContacts URL + // can be constructed directly without the company response. + const [cwCompany, allContactsData] = await Promise.all([ + fetchCwCompanyById(cwCompanyId), + withCwRetry( + () => + connectWiseApi.get( + `/company/companies/${cwCompanyId}/contacts?pageSize=1000`, + ), + { label: `company#${cwCompanyId}/allContacts` }, + ), + ]); + if (!cwCompany) return null; - const contactHref = cwCompany.defaultContact?._info?.contact_href; - const defaultContactData = contactHref - ? await withCwRetry(() => connectWiseApi.get(contactHref), { - label: `company#${cwCompanyId}/defaultContact`, - }) - : undefined; - - const allContactsData = await withCwRetry( - () => - connectWiseApi.get(`${cwCompany._info.contacts_href}&pageSize=1000`), - { label: `company#${cwCompanyId}/allContacts` }, - ); + // Default contact: derive from allContacts instead of making an + // extra serial CW call. The company object carries the default + // contact's ID, so we can pull it from the list we already fetched. + const defaultContactId = cwCompany.defaultContact?.id; + const defaultContactData = defaultContactId + ? ((allContactsData.data as any[]).find( + (c: any) => c.id === defaultContactId, + ) ?? null) + : null; const blob = { company: cwCompany, - defaultContact: defaultContactData?.data ?? null, + defaultContact: defaultContactData, allContacts: allContactsData.data, }; @@ -491,11 +500,11 @@ export async function invalidateProductsCache( } /** - * Site TTL — 30 minutes. Site/address data rarely changes so we cache + * Site TTL — 20 minutes. Site/address data rarely changes so we cache * aggressively. The background refresh does NOT proactively warm site keys; * they are populated lazily on the first detail-view request. */ -const SITE_TTL_MS = 1_800_000; +const SITE_TTL_MS = 1_200_000; /** * Fetch a CW company site from ConnectWise and cache the result. diff --git a/src/modules/cw-utils/cwConcurrencyLimiter.ts b/src/modules/cw-utils/cwConcurrencyLimiter.ts new file mode 100644 index 0000000..6082970 --- /dev/null +++ b/src/modules/cw-utils/cwConcurrencyLimiter.ts @@ -0,0 +1,79 @@ +/** + * CW API Concurrency Limiter + * + * Limits the number of simultaneous in-flight requests to the ConnectWise + * API. CW responds significantly slower under high concurrency (observed + * ~3× slower at 9 concurrent vs 5–6 concurrent), so bounding the + * parallelism actually reduces total wall-clock time. + * + * Implemented as an Axios request interceptor that gates on a simple + * counting semaphore. When the limit is reached, new requests queue and + * resolve in FIFO order as earlier requests complete. + */ + +import type { AxiosInstance, InternalAxiosRequestConfig } from "axios"; + +// --------------------------------------------------------------------------- +// Semaphore +// --------------------------------------------------------------------------- + +class Semaphore { + private _current = 0; + private _queue: (() => void)[] = []; + + constructor(private _max: number) {} + + /** Acquire a slot — resolves immediately if under the limit, else waits. */ + acquire(): Promise { + if (this._current < this._max) { + this._current++; + return Promise.resolve(); + } + return new Promise((resolve) => { + this._queue.push(resolve); + }); + } + + /** Release a slot — wakes the next queued caller, if any. */ + release(): void { + const next = this._queue.shift(); + if (next) { + // Hand the slot directly to the next waiter (don't decrement) + next(); + } else { + this._current--; + } + } +} + +// --------------------------------------------------------------------------- +// Interceptor attachment +// --------------------------------------------------------------------------- + +/** + * Attach a concurrency-limiting interceptor to an Axios instance. + * + * @param api - The Axios instance to limit. + * @param max - Maximum concurrent in-flight requests (default: 6). + */ +export function attachCwConcurrencyLimiter(api: AxiosInstance, max = 6): void { + const sem = new Semaphore(max); + + // Request interceptor: wait for a slot before the request fires + api.interceptors.request.use(async (config: InternalAxiosRequestConfig) => { + await sem.acquire(); + return config; + }); + + // Response interceptor: release the slot on success or failure + api.interceptors.response.use( + (response) => { + sem.release(); + return response; + }, + (error) => { + sem.release(); + return Promise.reject(error); + }, + ); +} diff --git a/src/modules/cw-utils/members/memberCache.ts b/src/modules/cw-utils/members/memberCache.ts index 6076c7f..0784fb2 100644 --- a/src/modules/cw-utils/members/memberCache.ts +++ b/src/modules/cw-utils/members/memberCache.ts @@ -102,3 +102,40 @@ export const resolveMember = async ( cwMemberId: cwMember?.id ?? null, }; }; + +/** + * Resolve Multiple CW Identifiers in a Single Batch + * + * Same as `resolveMember` but batches the DB query so that N identifiers + * require only **one** `findMany` instead of N `findFirst` calls. + * + * @param identifiers - Array of CW member identifiers + * @returns Map of identifier → ResolvedMember + */ +export const resolveMembers = async ( + identifiers: string[], +): Promise> => { + const unique = [...new Set(identifiers)]; + + // Single batched DB query for all identifiers + const localUsers = await prisma.user.findMany({ + where: { cwIdentifier: { in: unique } }, + select: { id: true, cwIdentifier: true }, + }); + const userMap = new Map(localUsers.map((u) => [u.cwIdentifier, u.id])); + + const result = new Map(); + for (const identifier of unique) { + const cwMember = memberCache.get(identifier); + const name = cwMember + ? `${cwMember.firstName} ${cwMember.lastName}`.trim() || identifier + : identifier; + result.set(identifier, { + id: userMap.get(identifier) ?? null, + identifier, + name, + cwMemberId: cwMember?.id ?? null, + }); + } + return result; +}; diff --git a/src/modules/cw-utils/procurement/catalog.ts b/src/modules/cw-utils/procurement/catalog.ts index a060b7d..bc4a22d 100644 --- a/src/modules/cw-utils/procurement/catalog.ts +++ b/src/modules/cw-utils/procurement/catalog.ts @@ -66,10 +66,28 @@ export const catalogCw = { return allItems; }, + fetchByCatalogId: async (cwCatalogId: number): Promise => { + try { + const response = await connectWiseApi.get( + `/procurement/catalog/${cwCatalogId}`, + ); + return response.data; + } catch { + const fallback = await connectWiseApi.get( + `/procurement/catalog/items/${cwCatalogId}`, + ); + return fallback.data; + } + }, fetch: async (id: string): Promise => { - const response = await connectWiseApi.get( - `/procurement/catalog/items/${id}`, - ); - return response.data; + const numericId = Number(id); + if (!Number.isFinite(numericId)) { + const response = await connectWiseApi.get( + `/procurement/catalog/items/${id}`, + ); + return response.data; + } + + return catalogCw.fetchByCatalogId(numericId); }, }; diff --git a/src/modules/cw-utils/procurement/listenInventoryAdjustments.ts b/src/modules/cw-utils/procurement/listenInventoryAdjustments.ts new file mode 100644 index 0000000..59f97a4 --- /dev/null +++ b/src/modules/cw-utils/procurement/listenInventoryAdjustments.ts @@ -0,0 +1,469 @@ +import { prisma, redis, connectWiseApi } from "../../../constants"; +import { withCwRetry } from "../withCwRetry"; +import { catalogCw } from "./catalog"; +import { CatalogItem } from "./catalog.types"; + +type JsonObject = Record; + +type TrackedProduct = { + cwCatalogId: number; + product: string; + onHand: string; + inventory: string; + key: string; +}; + +type AdjustmentSnapshot = { + key: string; + trackedRows: TrackedProduct[]; + signature: string; +}; + +const ADJUSTMENTS_ENDPOINT = "/procurement/adjustments?pageSize=1000"; +const CATALOG_ITEM_CACHE_PREFIX = "catalog:item:cw:"; +const CATALOG_ITEM_CACHE_TTL_SECONDS = 20 * 60; +const MAX_SYNC_PER_CYCLE = Number( + process.env.CW_ADJUSTMENT_SYNC_MAX_PER_CYCLE ?? "50", +); +const SYNC_COOLDOWN_MS = Number( + process.env.CW_ADJUSTMENT_SYNC_COOLDOWN_MS ?? `${10 * 60 * 1000}`, +); + +let previous = new Map(); +let previousProductState = new Map(); +const lastSyncedAt = new Map(); +let inFlight = false; + +const isObject = (value: unknown): value is JsonObject => + typeof value === "object" && value !== null && !Array.isArray(value); + +const toObject = (value: unknown): JsonObject => { + if (!isObject(value)) return {}; + + return value; +}; + +const stableStringify = (value: unknown): string => { + if (Array.isArray(value)) { + const entries = value.map((entry) => stableStringify(entry)).sort(); + + return `[${entries.join(",")}]`; + } + + if (isObject(value)) { + const keys = Object.keys(value).sort(); + const pairs = keys.map( + (key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`, + ); + + return `{${pairs.join(",")}}`; + } + + return JSON.stringify(value); +}; + +const readPathValue = (obj: JsonObject, path: string): unknown => { + const parts = path.split("."); + let current: unknown = obj; + + for (const part of parts) { + if (!isObject(current)) return null; + current = current[part]; + } + + return current; +}; + +const firstValue = (obj: JsonObject, paths: string[]): unknown => { + for (const path of paths) { + const value = readPathValue(obj, path); + if (value === null || value === undefined || value === "") continue; + + return value; + } + + return null; +}; + +const asNumber = (value: unknown): number | null => { + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "string" && value.length > 0) { + const parsed = Number(value); + if (Number.isFinite(parsed)) return parsed; + } + + return null; +}; + +const asText = (value: unknown): string => { + if (value === null || value === undefined || value === "") return "-"; + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + return String(value); + } + + if (Array.isArray(value)) { + return `[${value.map((entry) => asText(entry)).join(",")}]`; + } + + if (!isObject(value)) return String(value); + + const preferredFields = ["name", "identifier", "id", "code", "value"]; + for (const field of preferredFields) { + const fieldValue = readPathValue(value, field); + if (fieldValue === null || fieldValue === undefined || fieldValue === "") + continue; + if (typeof fieldValue === "object") continue; + + return String(fieldValue); + } + + return stableStringify(value); +}; + +const adjustmentKey = (adjustment: JsonObject): string => { + const keyPaths = [ + "id", + "adjustmentId", + "procurementAdjustmentId", + "recordId", + "recId", + "_info.id", + "_info.href", + ]; + + for (const path of keyPaths) { + const key = firstValue(adjustment, [path]); + const keyText = asText(key); + if (keyText !== "-") return keyText; + } + + return `anon:${stableStringify(adjustment)}`; +}; + +const trackedRow = (detail: JsonObject): TrackedProduct | null => { + const cwCatalogId = asNumber( + firstValue(detail, [ + "catalogItem.id", + "catalogItemId", + "catalog.id", + "catalogId", + "item.id", + "itemId", + "product.id", + "productId", + "id", + ]), + ); + if (!cwCatalogId) return null; + + const onHand = asText( + firstValue(detail, [ + "onHand", + "onHandQty", + "onHandQuantity", + "qtyOnHand", + "quantityOnHand", + "quantity.onHand", + ]), + ); + const inventory = asText( + firstValue(detail, [ + "inventory", + "inventoryQty", + "inventoryLevel", + "quantity", + "qty", + ]), + ); + if (onHand === "-" && inventory === "-") return null; + + const product = asText( + firstValue(detail, [ + "product.name", + "product.identifier", + "item.name", + "item.identifier", + "catalogItem.name", + "catalogItem.identifier", + "productName", + "productIdentifier", + "sku", + "identifier", + ]), + ); + + return { + cwCatalogId, + product, + onHand, + inventory, + key: `${cwCatalogId}|${product}|${onHand}|${inventory}`, + }; +}; + +const trackedRows = (adjustment: JsonObject): TrackedProduct[] => { + const detailCandidates = [ + readPathValue(adjustment, "adjustmentDetails"), + readPathValue(adjustment, "details"), + readPathValue(adjustment, "lineItems"), + ]; + + for (const candidate of detailCandidates) { + if (!Array.isArray(candidate)) continue; + + const rows = candidate + .map((entry) => trackedRow(toObject(entry))) + .filter((entry): entry is TrackedProduct => entry !== null) + .sort((a, b) => a.key.localeCompare(b.key)); + + if (rows.length > 0) return rows; + } + + const root = trackedRow(adjustment); + if (!root) return []; + + return [root]; +}; + +const snapshot = (rows: unknown[]): Map => { + const out = new Map(); + + for (const entry of rows) { + const adjustment = toObject(entry); + const key = adjustmentKey(adjustment); + const rowsTracked = trackedRows(adjustment); + const signature = stableStringify(rowsTracked); + out.set(key, { + key, + trackedRows: rowsTracked, + signature, + }); + } + + return out; +}; + +const changedCatalogIds = ( + before: Map, + after: Map, +): Set => { + const changed = new Set(); + + for (const [cwCatalogId, nextSignature] of after) { + const prevSignature = before.get(cwCatalogId); + if (!prevSignature) { + changed.add(cwCatalogId); + continue; + } + + if (prevSignature === nextSignature) continue; + changed.add(cwCatalogId); + } + + return changed; +}; + +const productState = ( + adjustments: Map, +): Map => { + const grouped = new Map>(); + + for (const snapshot of adjustments.values()) { + for (const row of snapshot.trackedRows) { + const rows = grouped.get(row.cwCatalogId) ?? new Set(); + rows.add(row.key); + grouped.set(row.cwCatalogId, rows); + } + } + + const state = new Map(); + for (const [cwCatalogId, rows] of grouped) { + state.set(cwCatalogId, stableStringify([...rows].sort())); + } + + return state; +}; + +const applySyncGuards = (ids: number[]): number[] => { + const now = Date.now(); + const cooledIds = ids.filter((cwCatalogId) => { + const last = lastSyncedAt.get(cwCatalogId); + if (!last) return true; + + return now - last >= SYNC_COOLDOWN_MS; + }); + + if (cooledIds.length <= MAX_SYNC_PER_CYCLE) return cooledIds; + return cooledIds.slice(0, MAX_SYNC_PER_CYCLE); +}; + +const fetchAdjustments = async (): Promise => { + const response = await withCwRetry( + () => connectWiseApi.get(ADJUSTMENTS_ENDPOINT), + { + label: "inventory-adjustments", + maxAttempts: 3, + }, + ); + const payload = response.data; + + if (Array.isArray(payload)) return payload; + if (isObject(payload) && Array.isArray(payload.data)) return payload.data; + + return []; +}; + +const cacheKey = (cwCatalogId: number) => + `${CATALOG_ITEM_CACHE_PREFIX}${cwCatalogId}`; + +const cwLastUpdated = (item: CatalogItem): Date => { + const value = item._info?.lastUpdated; + if (!value) return new Date(); + + const parsed = new Date(value); + const invalidDate = Number.isNaN(parsed.getTime()); + if (invalidDate) return new Date(); + + return parsed; +}; + +const syncCatalogItem = async (cwCatalogId: number): Promise => { + try { + const item = await withCwRetry( + () => catalogCw.fetchByCatalogId(cwCatalogId), + { + label: `catalog-item:${cwCatalogId}`, + maxAttempts: 3, + }, + ); + const onHand = await withCwRetry( + () => catalogCw.fetchInventoryOnHand(cwCatalogId), + { + label: `catalog-onhand:${cwCatalogId}`, + maxAttempts: 3, + }, + ); + + const persisted = await prisma.catalogItem.upsert({ + where: { cwCatalogId }, + create: { + cwCatalogId, + identifier: item.identifier, + name: item.description, + description: item.description, + customerDescription: item.customerDescription, + internalNotes: item.notes, + category: item.category?.name, + categoryCwId: item.category?.id, + subcategory: item.subcategory?.name, + subcategoryCwId: item.subcategory?.id, + manufacturer: item.manufacturer?.name, + manufactureCwId: item.manufacturer?.id, + partNumber: item.manufacturerPartNumber, + vendorName: item.vendor?.name, + vendorSku: item.vendorSku, + vendorCwId: item.vendor?.id, + price: item.price, + cost: item.cost, + inactive: item.inactiveFlag, + salesTaxable: item.taxableFlag, + onHand, + cwLastUpdated: cwLastUpdated(item), + }, + update: { + identifier: item.identifier, + name: item.description, + description: item.description, + customerDescription: item.customerDescription, + internalNotes: item.notes, + category: item.category?.name, + categoryCwId: item.category?.id, + subcategory: item.subcategory?.name, + subcategoryCwId: item.subcategory?.id, + manufacturer: item.manufacturer?.name, + manufactureCwId: item.manufacturer?.id, + partNumber: item.manufacturerPartNumber, + vendorName: item.vendor?.name, + vendorSku: item.vendorSku, + vendorCwId: item.vendor?.id, + price: item.price, + cost: item.cost, + inactive: item.inactiveFlag, + salesTaxable: item.taxableFlag, + onHand, + cwLastUpdated: cwLastUpdated(item), + }, + }); + + await redis.set( + cacheKey(cwCatalogId), + JSON.stringify({ + cwCatalogId, + onHand, + cwItem: item, + dbItem: persisted, + syncedAt: new Date().toISOString(), + }), + "EX", + CATALOG_ITEM_CACHE_TTL_SECONDS, + ); + + return true; + } catch (err) { + console.error( + `[inventory-adjustments] failed to sync catalog item ${cwCatalogId}`, + err, + ); + return false; + } +}; + +export const listenInventoryAdjustments = async (): Promise => { + if (inFlight) return; + inFlight = true; + + try { + const rows = await fetchAdjustments(); + const current = snapshot(rows); + const currentProductState = productState(current); + + if (previous.size === 0) { + previous = current; + previousProductState = currentProductState; + console.log( + `[inventory-adjustments] baseline captured (${current.size} adjustments, ${currentProductState.size} products)`, + ); + return; + } + + const changedIds = [ + ...changedCatalogIds(previousProductState, currentProductState), + ].sort((a, b) => a - b); + const guardedIds = applySyncGuards(changedIds); + previous = current; + previousProductState = currentProductState; + if (guardedIds.length === 0) return; + + let successCount = 0; + for (const cwCatalogId of guardedIds) { + const ok = await syncCatalogItem(cwCatalogId); + if (!ok) continue; + lastSyncedAt.set(cwCatalogId, Date.now()); + successCount += 1; + } + + const skippedByCooldown = changedIds.length - guardedIds.length; + + console.log( + `[inventory-adjustments] inventory changed for ${changedIds.length} products, queued ${guardedIds.length}, synced ${successCount}, cooldown/cap skipped ${skippedByCooldown}`, + ); + } catch (err) { + console.error("[inventory-adjustments] listener failed", err); + } finally { + inFlight = false; + } +}; diff --git a/src/modules/cw-utils/procurement/refreshCatalog.ts b/src/modules/cw-utils/procurement/refreshCatalog.ts index 496589b..17f4b6f 100644 --- a/src/modules/cw-utils/procurement/refreshCatalog.ts +++ b/src/modules/cw-utils/procurement/refreshCatalog.ts @@ -2,6 +2,31 @@ import { prisma } from "../../../constants"; import { events } from "../../globalEvents"; import { catalogCw } from "./catalog"; +const CONCURRENCY = 6; +const BATCH_DELAY_MS = 250; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const runSlowParallel = async ( + tasks: Array<() => Promise>, +): Promise => { + let failureCount = 0; + + for (let i = 0; i < tasks.length; i += CONCURRENCY) { + const batch = tasks.slice(i, i + CONCURRENCY); + const results = await Promise.allSettled(batch.map((task) => task())); + + for (const result of results) { + if (result.status === "rejected") failureCount += 1; + } + + if (i + CONCURRENCY >= tasks.length) continue; + await sleep(BATCH_DELAY_MS); + } + + return failureCount; +}; + export const refreshCatalog = async () => { events.emit("cw:catalog:refresh:check"); @@ -46,101 +71,104 @@ export const refreshCatalog = async () => { staleCount: staleIds.length, }); - // 4. Fetch full catalog data, then filter to only stale items - const staleIdSet = new Set(staleIds); - const allCwItems = await catalogCw.fetchAllItemsFromCw(); - const allStaleItems = new Map(); + // 4. Fetch full CW item data for stale IDs using slow, bounded concurrency + const cwItemMap = new Map(); + const itemFetchTasks: Array<() => Promise> = staleIds.map( + (cwId) => async () => { + const item = await catalogCw.fetchByCatalogId(cwId); + cwItemMap.set(cwId, item); + }, + ); + const itemFetchFailures = await runSlowParallel(itemFetchTasks); - for (const [id, item] of allCwItems) { - if (staleIdSet.has(id)) { - allStaleItems.set(id, item); - } - } - - // 5. Batch fetch inventory onHand for stale items (50 concurrent) + // 5. Fetch inventory onHand for stale IDs using the same slow parallel strategy const onHandMap = new Map(); - const batchSize = 50; + const inventoryTasks: Array<() => Promise> = staleIds.map( + (cwId) => async () => { + try { + const onHand = await catalogCw.fetchInventoryOnHand(cwId); + onHandMap.set(cwId, onHand); + } catch { + onHandMap.set(cwId, 0); + } + }, + ); + const inventoryFailures = await runSlowParallel(inventoryTasks); - for (let i = 0; i < staleIds.length; i += batchSize) { - const batch = staleIds.slice(i, i + batchSize); - await Promise.all( - batch.map(async (cwId) => { - try { - const onHand = await catalogCw.fetchInventoryOnHand(cwId); - onHandMap.set(cwId, onHand); - } catch { - onHandMap.set(cwId, 0); - } - }), + // 6. Upsert stale/new items with bounded slow parallel execution + let updatedCount = 0; + const upsertTasks: Array<() => Promise> = staleIds.map( + (cwId) => async () => { + const item = cwItemMap.get(cwId); + if (!item) return; + + const cwLastUpdated = item._info?.lastUpdated + ? new Date(item._info.lastUpdated) + : new Date(); + const onHand = onHandMap.get(cwId) ?? 0; + + await prisma.catalogItem.upsert({ + where: { cwCatalogId: cwId }, + create: { + cwCatalogId: cwId, + identifier: item.identifier, + name: item.description, + description: item.description, + customerDescription: item.customerDescription, + internalNotes: item.notes, + category: item.category?.name, + categoryCwId: item.category?.id, + subcategory: item.subcategory?.name, + subcategoryCwId: item.subcategory?.id, + manufacturer: item.manufacturer?.name, + manufactureCwId: item.manufacturer?.id, + partNumber: item.manufacturerPartNumber, + vendorName: item.vendor?.name, + vendorSku: item.vendorSku, + vendorCwId: item.vendor?.id, + price: item.price, + cost: item.cost, + inactive: item.inactiveFlag, + salesTaxable: item.taxableFlag, + onHand, + cwLastUpdated, + }, + update: { + name: item.description, + identifier: item.identifier, + description: item.description, + customerDescription: item.customerDescription, + internalNotes: item.notes, + category: item.category?.name, + categoryCwId: item.category?.id, + subcategory: item.subcategory?.name, + subcategoryCwId: item.subcategory?.id, + manufacturer: item.manufacturer?.name, + manufactureCwId: item.manufacturer?.id, + partNumber: item.manufacturerPartNumber, + vendorName: item.vendor?.name, + vendorSku: item.vendorSku, + vendorCwId: item.vendor?.id, + price: item.price, + cost: item.cost, + inactive: item.inactiveFlag, + salesTaxable: item.taxableFlag, + onHand, + cwLastUpdated, + }, + }); + updatedCount += 1; + }, + ); + const upsertFailures = await runSlowParallel(upsertTasks); + + const failedTasks = itemFetchFailures + inventoryFailures + upsertFailures; + if (failedTasks > 0) { + console.warn( + `[catalog-refresh] ${failedTasks} slow-parallel task(s) failed; remaining items will retry next cycle`, ); } - // 6. Upsert only the stale/new items - const updatedCount = ( - await Promise.all( - staleIds.map(async (cwId) => { - const item = allStaleItems.get(cwId); - if (!item) return null; - - const cwLastUpdated = item._info?.lastUpdated - ? new Date(item._info.lastUpdated) - : new Date(); - const onHand = onHandMap.get(cwId) ?? 0; - - return await prisma.catalogItem.upsert({ - where: { cwCatalogId: cwId }, - create: { - cwCatalogId: cwId, - identifier: item.identifier, - name: item.description, - description: item.description, - customerDescription: item.customerDescription, - internalNotes: item.notes, - category: item.category?.name, - categoryCwId: item.category?.id, - subcategory: item.subcategory?.name, - subcategoryCwId: item.subcategory?.id, - manufacturer: item.manufacturer?.name, - manufactureCwId: item.manufacturer?.id, - partNumber: item.manufacturerPartNumber, - vendorName: item.vendor?.name, - vendorSku: item.vendorSku, - vendorCwId: item.vendor?.id, - price: item.price, - cost: item.cost, - inactive: item.inactiveFlag, - salesTaxable: item.taxableFlag, - onHand, - cwLastUpdated, - }, - update: { - name: item.description, - identifier: item.identifier, - description: item.description, - customerDescription: item.customerDescription, - internalNotes: item.notes, - category: item.category?.name, - categoryCwId: item.category?.id, - subcategory: item.subcategory?.name, - subcategoryCwId: item.subcategory?.id, - manufacturer: item.manufacturer?.name, - manufactureCwId: item.manufacturer?.id, - partNumber: item.manufacturerPartNumber, - vendorName: item.vendor?.name, - vendorSku: item.vendorSku, - vendorCwId: item.vendor?.id, - price: item.price, - cost: item.cost, - inactive: item.inactiveFlag, - salesTaxable: item.taxableFlag, - onHand, - cwLastUpdated, - }, - }); - }), - ) - ).filter(Boolean).length; - events.emit("cw:catalog:refresh:completed", { totalCw: cwSummaries.size, totalDb: dbItems.length, diff --git a/src/modules/cw-utils/procurement/refreshInventory.ts b/src/modules/cw-utils/procurement/refreshInventory.ts index 5c642b9..dc603de 100644 --- a/src/modules/cw-utils/procurement/refreshInventory.ts +++ b/src/modules/cw-utils/procurement/refreshInventory.ts @@ -2,6 +2,11 @@ import { prisma } from "../../../constants"; import { events } from "../../globalEvents"; import { catalogCw } from "./catalog"; +const CONCURRENCY = 6; +const BATCH_DELAY_MS = 250; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + export const refreshInventory = async () => { events.emit("cw:inventory:refresh:check"); @@ -23,13 +28,13 @@ export const refreshInventory = async () => { totalItems: dbItems.length, }); - // 2. Batch fetch inventory onHand for all items (50 concurrent) + // 2. Slow-parallel fetch inventory onHand for all items const onHandMap = new Map(); - const batchSize = 150; + let failedCount = 0; - for (let i = 0; i < dbItems.length; i += batchSize) { - const batch = dbItems.slice(i, i + batchSize); - await Promise.all( + for (let i = 0; i < dbItems.length; i += CONCURRENCY) { + const batch = dbItems.slice(i, i + CONCURRENCY); + const results = await Promise.allSettled( batch.map(async (item) => { try { const onHand = await catalogCw.fetchInventoryOnHand(item.cwCatalogId); @@ -39,6 +44,13 @@ export const refreshInventory = async () => { } }), ); + + for (const result of results) { + if (result.status === "rejected") failedCount += 1; + } + + if (i + CONCURRENCY >= dbItems.length) continue; + await sleep(BATCH_DELAY_MS); } // 3. Only update items where onHand has changed @@ -71,4 +83,10 @@ export const refreshInventory = async () => { totalItems: dbItems.length, updatedCount, }); + + if (failedCount > 0) { + console.warn( + `[inventory-refresh] ${failedCount} task(s) failed; fallback values were used and will retry next sweep`, + ); + } }; diff --git a/src/types/PermissionNodes.ts b/src/types/PermissionNodes.ts index 5c12bd6..a20bae6 100644 --- a/src/types/PermissionNodes.ts +++ b/src/types/PermissionNodes.ts @@ -383,6 +383,13 @@ export const PERMISSION_NODES = { ], }, + cwCallbacks: { + name: "ConnectWise Callback Routes", + description: + "Inbound ConnectWise callback endpoints. These routes are intentionally unauthenticated and do not require permission nodes.", + permissions: [], + }, + sales: { name: "Sales Permissions", description: "Permissions for accessing and managing sales opportunities", diff --git a/test-cw-edit-item.ts b/test-cw-edit-item.ts new file mode 100644 index 0000000..86f613a --- /dev/null +++ b/test-cw-edit-item.ts @@ -0,0 +1,600 @@ +/** + * Test Script: CW Forecast Item Edit & Partial Cancellation + * + * This script performs read-write operations against the ConnectWise API: + * + * 1. Search all open opportunities for a forecast item with description + * matching "labor Special Order" (case-insensitive). + * 2. Report the current state of that item (price, cost, qty, etc.). + * 3. PATCH the item: revenue → 72,000 | cost → 8,500 | quantity → 67 + * 4. Verify the update by re-fetching the forecast. + * 5. Cancel 13 units via the linked procurement product + * (partial cancellation: quantityCancelled = 13). + * 6. Verify the cancellation by re-fetching procurement data. + * 7. Report on every step. + * + * Usage: bun run test-cw-edit-item.ts + */ + +import axios from "axios"; + +const cw = axios.create({ + baseURL: "https://ttscw.totaltech.net/v4_6_release/apis/3.0/", + headers: { + Authorization: `Basic ${process.env.CW_BASIC_TOKEN}`, + clientId: `${process.env.CW_CLIENT_ID}`, + "Content-Type": "application/json", + }, + timeout: 30_000, +}); + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +const log = (label: string, ...args: unknown[]) => + console.log(`\n[${label}]`, ...args); + +const divider = () => console.log("─".repeat(72)); + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +const fmt = (n: number) => + n.toLocaleString("en-US", { + minimumFractionDigits: 2, + maximumFractionDigits: 2, + }); + +// ── Types (minimal, for this script) ────────────────────────────────────────── + +interface ForecastItem { + id: number; + forecastDescription: string; + productDescription: string; + quantity: number; + revenue: number; + cost: number; + margin: number; + forecastType: string; + sequenceNumber: number; + catalogItem?: { id: number; identifier: string }; + status?: { id: number; name: string }; + opportunity?: { id: number; name: string }; + [key: string]: unknown; +} + +interface Forecast { + id: number; + forecastItems: ForecastItem[]; + [key: string]: unknown; +} + +interface ProcurementProduct { + id: number; + forecastDetailId: number; + description: string; + quantity: number; + price: number; + cost: number; + cancelledFlag: boolean; + quantityCancelled: number; + cancelledReason: string | null; + cancelledBy: string | null; + cancelledDate: string | null; + opportunity?: { id: number }; + [key: string]: unknown; +} + +// ── Main ────────────────────────────────────────────────────────────────────── + +async function main() { + divider(); + log("START", "CW Forecast Item Edit & Cancellation Test"); + log("START", `Timestamp: ${new Date().toISOString()}`); + divider(); + + // ── Step 1: Find the "labor Special Order" forecast item ──────────────── + + const OPP_ID = 5150; + log( + "SEARCH", + `Looking for forecast item matching "labor Special Order" on opportunity ${OPP_ID}...`, + ); + + // Fetch the forecast for opportunity 5150 directly + let targetOppId: number = OPP_ID; + let targetItem: ForecastItem | null = null; + let targetForecast: Forecast | null = null; + + const forecastRes = await cw.get(`/sales/opportunities/${OPP_ID}/forecast`); + targetForecast = forecastRes.data as Forecast; + const match = (targetForecast.forecastItems ?? []).find( + (fi: ForecastItem) => + fi.forecastDescription?.toLowerCase().includes("special order") || + fi.productDescription?.toLowerCase().includes("special order"), + ); + + if (match) { + targetItem = match; + log("SEARCH", `✓ FOUND forecast item on opportunity ${OPP_ID}`); + } + + if (!targetItem || !targetForecast) { + log( + "SEARCH", + `✗ No "labor Special Order" item found on opportunity ${OPP_ID}.`, + ); + log("SEARCH", "All forecast items on this opportunity:"); + for (const fi of targetForecast.forecastItems ?? []) { + console.log( + ` id=${fi.id} "${fi.forecastDescription}" / "${fi.productDescription}"`, + ); + } + log("SEARCH", "Aborting."); + process.exit(1); + } + + // ── Step 2: Report current state ──────────────────────────────────────── + + divider(); + log("CURRENT STATE", "Forecast item details BEFORE edit:"); + console.log(` Opportunity ID: ${targetOppId}`); + console.log(` Forecast Item ID: ${targetItem.id}`); + console.log(` Forecast Description: ${targetItem.forecastDescription}`); + console.log(` Product Description: ${targetItem.productDescription}`); + console.log( + ` Catalog Item: ${targetItem.catalogItem?.identifier ?? "(none)"} (cwId=${targetItem.catalogItem?.id ?? "N/A"})`, + ); + console.log(` Forecast Type: ${targetItem.forecastType}`); + console.log( + ` Status: ${targetItem.status?.name ?? "?"} (id=${targetItem.status?.id ?? "?"})`, + ); + console.log(` Sequence Number: ${targetItem.sequenceNumber}`); + console.log(` ──────────────────────────────────`); + console.log(` Quantity: ${targetItem.quantity}`); + console.log(` Revenue (Price): $${fmt(targetItem.revenue)}`); + console.log(` Cost: $${fmt(targetItem.cost)}`); + console.log(` Margin: $${fmt(targetItem.margin)}`); + + // Also report all items on this opportunity for context + const allItems = targetForecast.forecastItems ?? []; + log( + "CONTEXT", + `Total forecast items on this opportunity: ${allItems.length}`, + ); + for (const fi of allItems) { + const marker = fi.id === targetItem.id ? " ◀ TARGET" : ""; + console.log( + ` [${fi.sequenceNumber}] id=${fi.id} "${fi.forecastDescription}" ` + + `qty=${fi.quantity} rev=$${fmt(fi.revenue)} cost=$${fmt(fi.cost)}${marker}`, + ); + } + + // ── Step 3: PATCH the forecast item ───────────────────────────────────── + + divider(); + const UNIT_PRICE = 72_000; + const UNIT_COST = 8_500; + const QTY = 67; + const TOTAL_REVENUE = UNIT_PRICE * QTY; // $4,824,000 + const TOTAL_COST = UNIT_COST * QTY; // $569,500 + + log("EDIT", "Patching forecast item..."); + log( + "EDIT", + ` Unit price: $${fmt(UNIT_PRICE)} × ${QTY} = $${fmt(TOTAL_REVENUE)} (revenue)`, + ); + log( + "EDIT", + ` Unit cost: $${fmt(UNIT_COST)} × ${QTY} = $${fmt(TOTAL_COST)} (cost)`, + ); + log("EDIT", ` Quantity: ${QTY}`); + + // Find the index of our target item in the forecast array + const forecastItems = targetForecast.forecastItems ?? []; + const targetIdx = forecastItems.findIndex((fi) => fi.id === targetItem!.id); + + if (targetIdx === -1) { + log( + "EDIT", + "✗ Could not find target item index in forecast array. Aborting.", + ); + process.exit(1); + } + + log("EDIT", `Target item is at index ${targetIdx} in forecastItems array.`); + + const patchOps = [ + { + op: "replace", + path: `/forecastItems/${targetIdx}/revenue`, + value: TOTAL_REVENUE, + }, + { + op: "replace", + path: `/forecastItems/${targetIdx}/cost`, + value: TOTAL_COST, + }, + { op: "replace", path: `/forecastItems/${targetIdx}/quantity`, value: QTY }, + ]; + + log("EDIT", "Patch operations:"); + for (const op of patchOps) { + console.log(` ${op.op} ${op.path} → ${op.value}`); + } + + try { + const patchRes = await cw.patch( + `/sales/opportunities/${targetOppId}/forecast`, + patchOps, + ); + const updatedForecast: Forecast = patchRes.data; + const updatedItem = (updatedForecast.forecastItems ?? [])[targetIdx]; + + if (!updatedItem) { + log("EDIT", "✗ Item not found at expected index after PATCH."); + } else { + log("EDIT", "✓ PATCH successful. Updated item:"); + console.log(` Forecast Item ID: ${updatedItem.id}`); + console.log(` Forecast Description: ${updatedItem.forecastDescription}`); + console.log(` Quantity: ${updatedItem.quantity}`); + console.log(` Revenue (Price): $${fmt(updatedItem.revenue)}`); + console.log(` Cost: $${fmt(updatedItem.cost)}`); + console.log(` Margin: $${fmt(updatedItem.margin)}`); + + // Verify values match what we set + const checks = [ + { + field: "revenue", + expected: TOTAL_REVENUE, + actual: updatedItem.revenue, + }, + { field: "cost", expected: TOTAL_COST, actual: updatedItem.cost }, + { field: "quantity", expected: QTY, actual: updatedItem.quantity }, + ]; + + log("VERIFY EDIT", "Checking values match requested:"); + for (const check of checks) { + const ok = check.actual === check.expected; + console.log( + ` ${ok ? "✓" : "✗"} ${check.field}: expected=${check.expected}, actual=${check.actual}`, + ); + } + + // Update our reference for the cancellation step + targetItem = updatedItem; + } + } catch (err: any) { + log("EDIT", `✗ PATCH failed: ${err.response?.status ?? err.message}`); + if (err.response?.data) { + console.log(" Response:", JSON.stringify(err.response.data, null, 2)); + } + + // If quantity PATCH failed (read-only), try without quantity + if (err.response?.status === 400 || err.response?.status === 422) { + log( + "EDIT", + "Retrying without quantity (may be read-only on forecast items)...", + ); + const retryOps = patchOps.filter((op) => !op.path.endsWith("/quantity")); + try { + const retryRes = await cw.patch( + `/sales/opportunities/${targetOppId}/forecast`, + retryOps, + ); + const retryForecast: Forecast = retryRes.data; + const retryItem = (retryForecast.forecastItems ?? [])[targetIdx]; + + if (retryItem) { + log( + "EDIT", + "✓ Retry PATCH successful (without quantity). Updated item:", + ); + console.log( + ` Quantity: ${retryItem.quantity} (unchanged — read-only)`, + ); + console.log(` Revenue (Price): $${fmt(retryItem.revenue)}`); + console.log(` Cost: $${fmt(retryItem.cost)}`); + console.log(` Margin: $${fmt(retryItem.margin)}`); + targetItem = retryItem; + } + } catch (retryErr: any) { + log( + "EDIT", + `✗ Retry also failed: ${retryErr.response?.status ?? retryErr.message}`, + ); + if (retryErr.response?.data) { + console.log( + " Response:", + JSON.stringify(retryErr.response.data, null, 2), + ); + } + } + } + } + + // ── Step 4: Re-fetch and confirm final forecast state ─────────────────── + + divider(); + log("RE-FETCH", "Fetching forecast to confirm final state..."); + await sleep(500); + + const confirmRes = await cw.get( + `/sales/opportunities/${targetOppId}/forecast`, + ); + const confirmedForecast: Forecast = confirmRes.data; + const confirmedItem = (confirmedForecast.forecastItems ?? []).find( + (fi) => fi.id === targetItem!.id, + ); + + if (confirmedItem) { + log("CONFIRMED STATE", "Forecast item after edit:"); + console.log(` Forecast Item ID: ${confirmedItem.id}`); + console.log(` Forecast Description: ${confirmedItem.forecastDescription}`); + console.log(` Quantity: ${confirmedItem.quantity}`); + console.log(` Revenue (Price): $${fmt(confirmedItem.revenue)}`); + console.log(` Cost: $${fmt(confirmedItem.cost)}`); + console.log(` Margin: $${fmt(confirmedItem.margin)}`); + } else { + log( + "CONFIRMED STATE", + "⚠ Could not find item by original ID — it may have been regenerated.", + ); + log("CONFIRMED STATE", "All current forecast items:"); + for (const fi of confirmedForecast.forecastItems ?? []) { + console.log( + ` id=${fi.id} "${fi.forecastDescription}" qty=${fi.quantity} rev=$${fmt(fi.revenue)} cost=$${fmt(fi.cost)}`, + ); + } + } + + // ── Step 5: Cancel 13 items via procurement product ───────────────────── + + divider(); + log("CANCEL", "Cancelling 13 units on this item via procurement product..."); + + // First, find existing procurement products linked to this opportunity + const procRes = await cw.get( + `/procurement/products?conditions=${encodeURIComponent(`opportunity/id=${targetOppId}`)}&pageSize=1000`, + ); + const procProducts: ProcurementProduct[] = procRes.data; + + log( + "CANCEL", + `Found ${procProducts.length} procurement product(s) on this opportunity.`, + ); + + if (procProducts.length > 0) { + for (const pp of procProducts) { + console.log( + ` Proc id=${pp.id} forecastDetailId=${pp.forecastDetailId} ` + + `"${pp.description}" qty=${pp.quantity} price=$${fmt(pp.price ?? 0)} ` + + `cancelled=${pp.cancelledFlag} qtyCancelled=${pp.quantityCancelled}`, + ); + } + } + + // Find the procurement product linked to our forecast item + const linkedProc = procProducts.find( + (pp) => pp.forecastDetailId === targetItem!.id, + ); + + if (linkedProc) { + log("CANCEL", `Found linked procurement product: id=${linkedProc.id}`); + log( + "CANCEL", + `Current state: cancelled=${linkedProc.cancelledFlag}, quantityCancelled=${linkedProc.quantityCancelled}`, + ); + log("CANCEL", "Patching: quantityCancelled → 13, cancelledFlag → true"); + + try { + const cancelRes = await cw.patch( + `/procurement/products/${linkedProc.id}`, + [ + { op: "replace", path: "cancelledFlag", value: true }, + { op: "replace", path: "quantityCancelled", value: 13 }, + { + op: "replace", + path: "cancelledReason", + value: "Test cancellation — 13 units", + }, + ], + ); + + log("CANCEL", "✓ Cancellation PATCH successful."); + console.log(` cancelledFlag: ${cancelRes.data.cancelledFlag}`); + console.log(` quantityCancelled: ${cancelRes.data.quantityCancelled}`); + console.log(` cancelledReason: ${cancelRes.data.cancelledReason}`); + console.log( + ` cancelledBy: ${cancelRes.data.cancelledBy ?? "N/A"}`, + ); + console.log( + ` cancelledDate: ${cancelRes.data.cancelledDate ?? "N/A"}`, + ); + } catch (err: any) { + log( + "CANCEL", + `✗ Cancellation PATCH failed: ${err.response?.status ?? err.message}`, + ); + if (err.response?.data) { + console.log(" Response:", JSON.stringify(err.response.data, null, 2)); + } + } + } else { + log( + "CANCEL", + `No procurement product linked to forecast item id=${targetItem!.id}.`, + ); + log( + "CANCEL", + "Creating a procurement product first, then cancelling 13...", + ); + + try { + // Create a procurement product linked to this forecast item + const createProcRes = await cw.post("/procurement/products", { + catalogItem: targetItem!.catalogItem?.id + ? { id: targetItem!.catalogItem.id } + : undefined, + description: + targetItem!.forecastDescription || targetItem!.productDescription, + quantity: targetItem!.quantity || 67, + price: targetItem!.revenue || 72_000, + cost: targetItem!.cost || 8_500, + billableOption: "Billable", + opportunity: { id: targetOppId }, + forecastDetailId: targetItem!.id, + }); + + const newProc = createProcRes.data; + log("CANCEL", `✓ Created procurement product id=${newProc.id}`); + console.log(` forecastDetailId: ${newProc.forecastDetailId}`); + console.log(` description: ${newProc.description}`); + console.log(` quantity: ${newProc.quantity}`); + console.log(` price: $${fmt(newProc.price ?? 0)}`); + console.log(` cost: $${fmt(newProc.cost ?? 0)}`); + + // Now cancel 13 units + log("CANCEL", "Patching procurement product: quantityCancelled → 13..."); + const cancelRes = await cw.patch(`/procurement/products/${newProc.id}`, [ + { op: "replace", path: "cancelledFlag", value: true }, + { op: "replace", path: "quantityCancelled", value: 13 }, + { + op: "replace", + path: "cancelledReason", + value: "Test cancellation — 13 units", + }, + ]); + + log("CANCEL", "✓ Cancellation PATCH successful."); + console.log(` cancelledFlag: ${cancelRes.data.cancelledFlag}`); + console.log(` quantityCancelled: ${cancelRes.data.quantityCancelled}`); + console.log(` cancelledReason: ${cancelRes.data.cancelledReason}`); + console.log( + ` cancelledBy: ${cancelRes.data.cancelledBy ?? "N/A"}`, + ); + console.log( + ` cancelledDate: ${cancelRes.data.cancelledDate ?? "N/A"}`, + ); + } catch (err: any) { + log("CANCEL", `✗ Failed: ${err.response?.status ?? err.message}`); + if (err.response?.data) { + console.log(" Response:", JSON.stringify(err.response.data, null, 2)); + } + } + } + + // ── Step 6: Final verification ────────────────────────────────────────── + + divider(); + log("FINAL VERIFY", "Re-fetching all data for final report..."); + await sleep(500); + + // Re-fetch forecast + const finalForecastRes = await cw.get( + `/sales/opportunities/${targetOppId}/forecast`, + ); + const finalForecast: Forecast = finalForecastRes.data; + const finalItem = + (finalForecast.forecastItems ?? []).find( + (fi) => fi.id === targetItem!.id, + ) ?? + (finalForecast.forecastItems ?? []).find( + (fi) => + fi.forecastDescription?.toLowerCase().includes("special order") || + fi.productDescription?.toLowerCase().includes("special order"), + ); + + // Re-fetch procurement + const finalProcRes = await cw.get( + `/procurement/products?conditions=${encodeURIComponent(`opportunity/id=${targetOppId}`)}&pageSize=1000`, + ); + const finalProcs: ProcurementProduct[] = finalProcRes.data; + + log("FINAL STATE — FORECAST ITEM", ""); + if (finalItem) { + console.log(` Forecast Item ID: ${finalItem.id}`); + console.log(` Forecast Description: ${finalItem.forecastDescription}`); + console.log(` Quantity: ${finalItem.quantity}`); + console.log(` Revenue (Price): $${fmt(finalItem.revenue)}`); + console.log(` Cost: $${fmt(finalItem.cost)}`); + console.log(` Margin: $${fmt(finalItem.margin)}`); + } else { + console.log(" ⚠ Target item not found in final forecast."); + } + + log("FINAL STATE — PROCUREMENT", `${finalProcs.length} product(s):`); + for (const pp of finalProcs) { + console.log( + ` id=${pp.id} forecastDetailId=${pp.forecastDetailId} ` + + `"${pp.description}" qty=${pp.quantity} cancelled=${pp.cancelledFlag} ` + + `qtyCancelled=${pp.quantityCancelled} reason="${pp.cancelledReason ?? ""}"`, + ); + } + + // ── Summary ───────────────────────────────────────────────────────────── + + divider(); + log("SUMMARY", ""); + + // After cancelling 13 of 67, CW recalculates totals for remaining 54 units + const expectedFinalRevenue = Math.round(UNIT_PRICE * (QTY - 13) * 100) / 100; + const expectedFinalCost = Math.round(UNIT_COST * (QTY - 13) * 100) / 100; + + const editOk = finalItem + ? Math.abs(finalItem.revenue - expectedFinalRevenue) < 1 && + Math.abs(finalItem.cost - expectedFinalCost) < 1 + : false; + const qtyOk = finalItem ? finalItem.quantity === QTY : false; + + if (finalItem) { + console.log( + ` Expected final revenue ($${fmt(UNIT_PRICE)} × ${QTY - 13}): $${fmt(expectedFinalRevenue)}`, + ); + console.log( + ` Actual final revenue: $${fmt(finalItem.revenue)}`, + ); + console.log( + ` Expected final cost ($${fmt(UNIT_COST)} × ${QTY - 13}): $${fmt(expectedFinalCost)}`, + ); + console.log( + ` Actual final cost: $${fmt(finalItem.cost)}`, + ); + } + const cancelOk = finalProcs.some( + (pp) => + pp.forecastDetailId === targetItem!.id && + pp.cancelledFlag === true && + pp.quantityCancelled === 13, + ); + + console.log( + ` Unit price $${fmt(UNIT_PRICE)}/ea: `, + editOk ? "✓ PASS" : "✗ FAIL", + ); + console.log( + ` Unit cost $${fmt(UNIT_COST)}/ea: `, + editOk ? "✓ PASS" : "✗ FAIL", + ); + console.log( + ` Quantity set to ${QTY}: `, + qtyOk ? "✓ PASS" : "✗ FAIL (may be read-only)", + ); + console.log( + " 13 units cancelled: ", + cancelOk ? "✓ PASS" : "✗ FAIL", + ); + + const allPass = editOk && qtyOk && cancelOk; + divider(); + log( + "RESULT", + allPass + ? "✓ ALL CHECKS PASSED" + : "⚠ SOME CHECKS DID NOT PASS — review output above", + ); + divider(); +} + +main().catch((err) => { + console.error("\n[FATAL]", err.response?.data ?? err.message); + process.exit(1); +}); diff --git a/tests/unit/computeProductsCacheTTL.test.ts b/tests/unit/computeProductsCacheTTL.test.ts index 94e5c67..214e39d 100644 --- a/tests/unit/computeProductsCacheTTL.test.ts +++ b/tests/unit/computeProductsCacheTTL.test.ts @@ -11,12 +11,12 @@ const DAY_MS = 24 * 60 * 60 * 1000; describe("computeProductsCacheTTL", () => { // -- Constants ---------------------------------------------------------- - test("PRODUCTS_TTL_HOT is 15 seconds", () => { - expect(PRODUCTS_TTL_HOT).toBe(15_000); + test("PRODUCTS_TTL_HOT is 45 seconds", () => { + expect(PRODUCTS_TTL_HOT).toBe(45_000); }); - test("PRODUCTS_TTL_LAZY is 30 minutes", () => { - expect(PRODUCTS_TTL_LAZY).toBe(1_800_000); + test("PRODUCTS_TTL_LAZY is 20 minutes", () => { + expect(PRODUCTS_TTL_LAZY).toBe(1_200_000); }); // -- Won/Lost status set ------------------------------------------------ diff --git a/utils/testAdjustmentsPoll.ts b/utils/testAdjustmentsPoll.ts new file mode 100644 index 0000000..f9bf654 --- /dev/null +++ b/utils/testAdjustmentsPoll.ts @@ -0,0 +1,552 @@ +import axios from "axios"; + +type JsonObject = Record; + +type SnapshotItem = { + key: string; + serialized: string; + data: JsonObject; + trackedRows: TrackedRow[]; + trackedSignature: string; +}; + +type TrackedRow = { + key: string; + product: string; + onHand: string; + inventory: string; +}; + +const POLL_MS = 60_000; +const CW_BASE_URL = + process.env.CW_BASE_URL ?? + "https://ttscw.totaltech.net/v4_6_release/apis/3.0"; +const ENDPOINT = "/procurement/adjustments?pageSize=1000"; + +const CW_BASIC_TOKEN = process.env.CW_BASIC_TOKEN; +const CW_CLIENT_ID = process.env.CW_CLIENT_ID; + +if (!CW_BASIC_TOKEN || !CW_CLIENT_ID) { + console.error( + "Missing required env vars: CW_BASIC_TOKEN and/or CW_CLIENT_ID", + ); + process.exit(1); +} + +const cw = axios.create({ + baseURL: CW_BASE_URL, + headers: { + Authorization: `Basic ${CW_BASIC_TOKEN}`, + clientId: CW_CLIENT_ID, + "Content-Type": "application/json", + }, + timeout: 30_000, +}); + +const isObject = (value: unknown): value is JsonObject => + typeof value === "object" && value !== null && !Array.isArray(value); + +const stableStringify = (value: unknown): string => { + if (Array.isArray(value)) { + const entries = value.map((entry) => stableStringify(entry)).sort(); + + return `[${entries.join(",")}]`; + } + + if (isObject(value)) { + const keys = Object.keys(value).sort(); + const pairs = keys.map( + (key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`, + ); + + return `{${pairs.join(",")}}`; + } + + return JSON.stringify(value); +}; + +const toObject = (value: unknown): JsonObject => { + if (!isObject(value)) return {}; + + return value; +}; + +const readPathValue = (obj: JsonObject, path: string): unknown => { + const parts = path.split("."); + let current: unknown = obj; + + for (const part of parts) { + if (!isObject(current)) return null; + current = current[part]; + } + + return current; +}; + +const asKey = (value: unknown): string | null => { + if (typeof value === "string" && value.length > 0) return value; + if (typeof value === "number") return value.toString(); + + return null; +}; + +const firstValue = (obj: JsonObject, paths: string[]): unknown => { + for (const path of paths) { + const value = readPathValue(obj, path); + if (value === null || value === undefined || value === "") continue; + + return value; + } + + return null; +}; + +const itemKey = (adjustment: JsonObject): string => { + const keyPaths = [ + "id", + "adjustmentId", + "procurementAdjustmentId", + "recordId", + "recId", + "_info.id", + "_info.href", + ]; + + for (const keyPath of keyPaths) { + const keyValue = asKey(readPathValue(adjustment, keyPath)); + if (keyValue) return keyValue; + } + + return `anon:${stableStringify(adjustment)}`; +}; + +const summarize = (adjustment: JsonObject) => { + const pick = (...paths: string[]) => { + for (const path of paths) { + const value = readPathValue(adjustment, path); + if (value !== null && value !== undefined && value !== "") return value; + } + + return "-"; + }; + + return { + id: pick("id", "adjustmentId", "procurementAdjustmentId", "recordId"), + type: pick( + "type.name", + "type.identifier", + "type.id", + "type", + "adjustmentType.name", + "adjustmentType", + "transactionType.name", + "transactionType", + ), + amount: pick("amount", "value", "total", "quantity"), + status: pick("status.name", "status", "state"), + description: pick("description", "summary", "notes"), + updatedBy: pick("_info.updatedBy", "updatedBy", "lastUpdatedBy"), + lastUpdated: pick("_info.lastUpdated", "lastUpdated", "dateUpdated"), + }; +}; + +const normalizeValue = (value: unknown): string => { + if (value === null || value === undefined || value === "") return "-"; + + return toDisplayValue(value); +}; + +const isMeaningfulQuantity = (value: string) => value !== "-"; + +const toTrackedRow = (detail: JsonObject): TrackedRow | null => { + const productValue = firstValue(detail, [ + "product.name", + "product.identifier", + "product.id", + "item.name", + "item.identifier", + "item.id", + "catalogItem.name", + "catalogItem.identifier", + "catalogItem.id", + "productIdentifier", + "productName", + "sku", + "identifier", + "id", + ]); + + const onHandValue = firstValue(detail, [ + "onHand", + "onHandQty", + "onHandQuantity", + "qtyOnHand", + "quantityOnHand", + "quantity.onHand", + ]); + + const inventoryValue = firstValue(detail, [ + "inventory", + "inventoryQty", + "inventoryLevel", + "quantity", + "qty", + ]); + + const onHand = normalizeValue(onHandValue); + const inventory = normalizeValue(inventoryValue); + const hasMeaningfulQuantity = + isMeaningfulQuantity(onHand) || isMeaningfulQuantity(inventory); + if (!hasMeaningfulQuantity) return null; + + const product = normalizeValue(productValue); + const rowKey = `${product}|${onHand}|${inventory}`; + + return { + key: rowKey, + product, + onHand, + inventory, + }; +}; + +const getTrackedRows = (adjustment: JsonObject): TrackedRow[] => { + const detailCandidates = [ + readPathValue(adjustment, "adjustmentDetails"), + readPathValue(adjustment, "details"), + readPathValue(adjustment, "lineItems"), + ]; + + for (const candidate of detailCandidates) { + if (!Array.isArray(candidate)) continue; + + const rows = candidate + .map((entry) => toTrackedRow(toObject(entry))) + .filter((entry): entry is TrackedRow => entry !== null) + .sort((a, b) => a.key.localeCompare(b.key)); + + if (rows.length > 0) return rows; + } + + const rootRow = toTrackedRow(adjustment); + if (!rootRow) return []; + + return [rootRow]; +}; + +const toDisplayValue = (value: unknown): string => { + if (value === null || value === undefined || value === "") return "-"; + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + return String(value); + } + + if (Array.isArray(value)) { + return `[${value.map((entry) => toDisplayValue(entry)).join(",")}]`; + } + + if (!isObject(value)) return String(value); + + const commonObjectPaths = ["name", "identifier", "id", "value", "code"]; + for (const path of commonObjectPaths) { + const objectValue = readPathValue(value, path); + if (objectValue === null || objectValue === undefined || objectValue === "") + continue; + if (typeof objectValue === "object") continue; + + return String(objectValue); + } + + return stableStringify(value); +}; + +const clean = (value: unknown): string => { + if (value === null || value === undefined || value === "") return "-"; + + return toDisplayValue(value); +}; + +const diffPaths = ( + before: unknown, + after: unknown, + currentPath = "", + paths: string[] = [], + maxPaths = 6, +): string[] => { + if (paths.length >= maxPaths) return paths; + + const beforeIsObject = isObject(before); + const afterIsObject = isObject(after); + if (!beforeIsObject || !afterIsObject) { + if (stableStringify(before) === stableStringify(after)) return paths; + const label = currentPath || "(root)"; + paths.push(label); + + return paths; + } + + const keys = [ + ...new Set([...Object.keys(before), ...Object.keys(after)]), + ].sort(); + for (const key of keys) { + if (paths.length >= maxPaths) return paths; + + const nextPath = currentPath ? `${currentPath}.${key}` : key; + diffPaths(before[key], after[key], nextPath, paths, maxPaths); + } + + return paths; +}; + +const toLine = (kind: "+" | "~" | "-", adjustment: JsonObject): string => { + const s = summarize(adjustment); + + return `${kind} id=${clean(s.id)} type=${clean(s.type)} status=${clean(s.status)} amount=${clean( + s.amount, + )} by=${clean(s.updatedBy)} desc=${clean(s.description)}`; +}; + +const updatedToLine = (before: JsonObject, after: JsonObject): string => { + const prev = summarize(before); + const next = summarize(after); + + const changed: string[] = []; + + if (clean(prev.status) !== clean(next.status)) { + changed.push(`status:${clean(prev.status)}→${clean(next.status)}`); + } + + if (clean(prev.amount) !== clean(next.amount)) { + changed.push(`amount:${clean(prev.amount)}→${clean(next.amount)}`); + } + + if (clean(prev.updatedBy) !== clean(next.updatedBy)) { + changed.push(`by:${clean(prev.updatedBy)}→${clean(next.updatedBy)}`); + } + + if (clean(prev.description) !== clean(next.description)) { + changed.push(`desc:${clean(prev.description)}→${clean(next.description)}`); + } + + if (clean(prev.lastUpdated) !== clean(next.lastUpdated)) { + changed.push( + `updated:${clean(prev.lastUpdated)}→${clean(next.lastUpdated)}`, + ); + } + + const noisyFields = new Set(["_info.lastUpdated"]); + const rawDiffs = diffPaths(before, after).filter( + (path) => !noisyFields.has(path), + ); + const rawDelta = + rawDiffs.length > 0 ? `fields:${rawDiffs.join(",")}` : "content changed"; + const delta = changed.length > 0 ? changed.join(" | ") : rawDelta; + + return `~ id=${clean(next.id)} type=${clean(next.type)} ${delta}`; +}; + +const formatTracked = (row: TrackedRow) => + `product=${row.product} onHand=${row.onHand} inventory=${row.inventory}`; + +const trackedAddedLine = (item: SnapshotItem) => { + const base = summarize(item.data); + const rows = item.trackedRows.slice(0, 3).map(formatTracked).join(" ; "); + const more = + item.trackedRows.length > 3 + ? ` ; +${item.trackedRows.length - 3} more` + : ""; + + return `+ id=${clean(base.id)} type=${clean(base.type)} ${rows}${more}`; +}; + +const trackedRemovedLine = (item: SnapshotItem) => { + const base = summarize(item.data); + const rows = item.trackedRows.slice(0, 3).map(formatTracked).join(" ; "); + const more = + item.trackedRows.length > 3 + ? ` ; +${item.trackedRows.length - 3} more` + : ""; + + return `- id=${clean(base.id)} type=${clean(base.type)} ${rows}${more}`; +}; + +const trackedUpdatedLine = ( + beforeItem: SnapshotItem, + afterItem: SnapshotItem, +) => { + const base = summarize(afterItem.data); + const beforeMap = new Map( + beforeItem.trackedRows.map((row) => [row.product, row]), + ); + const afterMap = new Map( + afterItem.trackedRows.map((row) => [row.product, row]), + ); + const productKeys = [ + ...new Set([...beforeMap.keys(), ...afterMap.keys()]), + ].sort(); + + const deltas: string[] = []; + for (const product of productKeys) { + const prev = beforeMap.get(product); + const next = afterMap.get(product); + if (!prev && next) { + deltas.push( + `${product} added(onHand=${next.onHand},inventory=${next.inventory})`, + ); + continue; + } + if (prev && !next) { + deltas.push(`${product} removed`); + continue; + } + if (!prev || !next) continue; + const onHandChanged = prev.onHand !== next.onHand; + const inventoryChanged = prev.inventory !== next.inventory; + if (!onHandChanged && !inventoryChanged) continue; + + const parts: string[] = []; + onHandChanged ? parts.push(`onHand:${prev.onHand}→${next.onHand}`) : null; + inventoryChanged + ? parts.push(`inventory:${prev.inventory}→${next.inventory}`) + : null; + deltas.push(`${product} ${parts.join(",")}`); + } + + const shown = deltas.slice(0, 3).join(" ; "); + const more = deltas.length > 3 ? ` ; +${deltas.length - 3} more` : ""; + const changes = shown || "inventory/on-hand changed"; + + return `~ id=${clean(base.id)} type=${clean(base.type)} ${changes}${more}`; +}; + +const toSnapshot = (rows: unknown[]): SnapshotItem[] => + rows.map((row) => { + const data = toObject(row); + const trackedRows = getTrackedRows(data); + const trackedSignature = stableStringify(trackedRows); + + return { + key: itemKey(data), + serialized: stableStringify(data), + data, + trackedRows, + trackedSignature, + }; + }); + +const fetchAdjustments = async (): Promise => { + const response = await cw.get(ENDPOINT); + const payload = response.data; + + if (Array.isArray(payload)) return payload; + if (isObject(payload) && Array.isArray(payload.data)) return payload.data; + + return []; +}; + +const now = () => new Date().toISOString(); + +let previous = new Map(); + +const run = async () => { + console.log( + `[${now()}] Watching ${CW_BASE_URL}${ENDPOINT} every ${POLL_MS / 1000}s`, + ); + + while (true) { + try { + const rows = await fetchAdjustments(); + const snapshotItems = toSnapshot(rows); + const current = new Map(snapshotItems.map((item) => [item.key, item])); + + if (previous.size === 0) { + previous = current; + console.log( + `[${now()}] Baseline captured (${current.size} adjustments)`, + ); + await Bun.sleep(POLL_MS); + continue; + } + + const added: SnapshotItem[] = []; + const removed: SnapshotItem[] = []; + const updated: Array<{ before: SnapshotItem; after: SnapshotItem }> = []; + + for (const [key, item] of current) { + const previousItem = previous.get(key); + + if (!previousItem) { + if (item.trackedRows.length === 0) continue; + added.push(item); + continue; + } + + const hasTrackedRows = + item.trackedRows.length > 0 || previousItem.trackedRows.length > 0; + if (!hasTrackedRows) continue; + + if (previousItem.trackedSignature !== item.trackedSignature) { + updated.push({ before: previousItem, after: item }); + } + } + + for (const [key, item] of previous) { + if (!current.has(key) && item.trackedRows.length > 0) + removed.push(item); + } + + if (added.length > 0 || updated.length > 0 || removed.length > 0) { + console.log(`\n[${now()}] Changes detected:`); + console.log(`- added: ${added.length}`); + console.log(`- updated: ${updated.length}`); + console.log(`- removed: ${removed.length}`); + + if (added.length > 0) { + console.log("\nAdded:"); + for (const item of added) { + console.log(trackedAddedLine(item)); + } + } + + if (updated.length > 0) { + console.log("\nUpdated:"); + for (const item of updated) { + console.log(trackedUpdatedLine(item.before, item.after)); + } + } + + if (removed.length > 0) { + console.log("\nRemoved:"); + for (const item of removed) { + console.log(trackedRemovedLine(item)); + } + } + } + + if (added.length === 0 && updated.length === 0 && removed.length === 0) { + console.log(`[${now()}] No changes (${current.size} adjustments)`); + } + + previous = current; + } catch (error) { + if (axios.isAxiosError(error)) { + console.error( + `[${now()}] Poll failed: ${error.response?.status ?? "ERR"}`, + error.message, + ); + } else { + console.error(`[${now()}] Poll failed:`, error); + } + } + + await Bun.sleep(POLL_MS); + } +}; + +run().catch((error) => { + console.error("Watcher crashed:", error); + process.exit(1); +}); diff --git a/utils/testWebserver.ts b/utils/testWebserver.ts new file mode 100644 index 0000000..e2de00c --- /dev/null +++ b/utils/testWebserver.ts @@ -0,0 +1,217 @@ +import { appendFile, mkdir } from "node:fs/promises"; + +const port = 3001; +const logDir = "cw-api-logs"; +const logFilePath = `${logDir}/test-webserver-${new Date().toISOString().replace(/[:.]/g, "-")}.jsonl`; + +const jsonBodyMethods = ["POST", "PUT", "PATCH", "DELETE"]; + +type ParsedJson = Record | unknown[]; +type EventSummary = ReturnType; + +const color = { + reset: "\x1b[0m", + bold: "\x1b[1m", + dim: "\x1b[2m", + cyan: "\x1b[36m", + blue: "\x1b[34m", + yellow: "\x1b[33m", + magenta: "\x1b[35m", + green: "\x1b[32m", + gray: "\x1b[90m", +}; + +const paint = (value: string, tone: keyof typeof color) => + `${color[tone]}${value}${color.reset}`; + +const safeParseJson = (value: string): ParsedJson | null => { + try { + const parsed = JSON.parse(value); + const isObject = typeof parsed === "object" && parsed !== null; + + return isObject ? (parsed as ParsedJson) : null; + } catch { + return null; + } +}; + +const parseEntity = (value: unknown): ParsedJson | null => { + if (typeof value === "string") return safeParseJson(value); + if (typeof value !== "object" || value === null) return null; + + return value as ParsedJson; +}; + +const asObject = (value: ParsedJson | null): Record | null => { + if (!value) return null; + if (Array.isArray(value)) return null; + + return value; +}; + +const parseJsonStringFields = ( + value: Record | null, +): Record | null => { + if (!value) return null; + + return Object.entries(value).reduce>( + (acc, [key, current]) => { + if (typeof current !== "string") { + acc[key] = current; + + return acc; + } + + const looksLikeJson = current.startsWith("{") || current.startsWith("["); + if (!looksLikeJson) { + acc[key] = current; + + return acc; + } + + const parsed = safeParseJson(current); + acc[key] = parsed ?? current; + + return acc; + }, + {}, + ); +}; + +const parseQuery = (url: URL) => { + const entries = [...url.searchParams.entries()]; + const params = entries.reduce>((acc, [key, value]) => { + acc[key] = value; + + return acc; + }, {}); + const rawQuery = url.search.startsWith("?") + ? url.search.slice(1) + : url.search; + const firstSegment = rawQuery.split("&")[0] ?? ""; + const hasEquals = firstSegment.includes("="); + const inferredId = !hasEquals && firstSegment ? firstSegment : null; + + return { + params, + inferredId, + }; +}; + +const buildSummary = ( + parsedBody: Record | null, + parsedEntity: Record | null, +) => { + if (!parsedBody) return null; + + return { + messageId: parsedBody.MessageId ?? null, + action: parsedBody.Action ?? null, + type: parsedBody.Type ?? null, + id: parsedBody.ID ?? null, + memberId: parsedBody.MemberId ?? null, + entityStatus: + parsedEntity?.StatusName ?? + parsedEntity?.TicketStatus ?? + parsedEntity?.Status ?? + null, + entitySummary: parsedEntity?.Summary ?? parsedEntity?.CompanyName ?? null, + entityUpdatedBy: parsedEntity?.UpdatedBy ?? null, + entityLastUpdated: + parsedEntity?.LastUpdatedUTC ?? parsedEntity?.LastUpdated ?? null, + }; +}; + +const displayTerminalEvent = ( + method: string, + routePath: string, + query: { params: Record; inferredId: string | null }, + summary: EventSummary, + timestamp: string, +) => { + const id = String(summary?.id ?? query.inferredId ?? "-"); + const action = String(summary?.action ?? query.params.action ?? "-"); + const eventType = String(summary?.type ?? routePath.split("/")[1] ?? "-"); + const actor = String( + summary?.entityUpdatedBy ?? + query.params.memberId ?? + summary?.memberId ?? + "-", + ); + const status = String(summary?.entityStatus ?? "-"); + const title = String(summary?.entitySummary ?? "-"); + + const methodTone = method === "GET" ? "green" : "yellow"; + + console.log(); + console.log( + `${paint("●", "cyan")} ${paint(method, methodTone)} ${paint(routePath, "blue")} ${paint(timestamp, "gray")}`, + ); + console.log( + `${paint("type", "magenta")}: ${paint(eventType, "bold")} ${paint("action", "magenta")}: ${action} ${paint("id", "magenta")}: ${id}`, + ); + console.log( + `${paint("actor", "magenta")}: ${paint(actor, "cyan")} ${paint("status", "magenta")}: ${status}`, + ); + console.log(`${paint("title", "magenta")}: ${title}`); +}; + +const writeLogRecord = async (record: Record) => { + await appendFile(logFilePath, `${JSON.stringify(record)}\n`, "utf8"); +}; + +await mkdir(logDir, { recursive: true }); + +Bun.serve({ + port, + async fetch(request) { + const url = new URL(request.url); + const routePath = `${url.pathname}${url.search}`; + const method = request.method; + const query = parseQuery(url); + const startedAt = new Date().toISOString(); + const rawBody = jsonBodyMethods.includes(method) + ? await request.text() + : ""; + const parsedJson = safeParseJson(rawBody); + const parsedBody = asObject(parsedJson); + const parsedBodyExpanded = parseJsonStringFields(parsedBody); + const parsedEntity = asObject(parseEntity(parsedBodyExpanded?.Entity)); + const summary = buildSummary(parsedBodyExpanded, parsedEntity); + + const responseBody = { + success: true, + method, + path: routePath, + timestamp: startedAt, + summary, + }; + + const responseStatus = 200; + + displayTerminalEvent(method, routePath, query, summary, startedAt); + + await writeLogRecord({ + timestamp: startedAt, + request: { + method, + path: routePath, + query, + headers: Object.fromEntries(request.headers.entries()), + bodyRaw: rawBody || null, + bodyParsed: parsedBodyExpanded, + entityParsed: parsedEntity, + summary, + }, + response: { + status: responseStatus, + body: responseBody, + }, + }); + + return Response.json(responseBody, { status: responseStatus }); + }, +}); + +console.log(`Test webserver listening on http://localhost:${port}`); +console.log(`Response/request log file: ${logFilePath}`);