#!/usr/bin/env python3 """ Analyze ConnectWise API call logs. Looks for the most recent log file in cw-api-logs/ by default, or accepts an explicit path as an argument. Usage: python3 analyze-cw-calls.py # latest file in cw-api-logs/ python3 analyze-cw-calls.py cw-api-logs/specific.jsonl """ import json import sys import os import glob import statistics from collections import Counter, defaultdict from datetime import datetime, timedelta # ── Colours ────────────────────────────────────────────────────────────────── RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" CYAN = "\033[96m" BOLD = "\033[1m" DIM = "\033[2m" RESET = "\033[0m" def colour_duration(ms: float) -> str: if ms >= 10_000: return f"{RED}{ms:,.0f}ms{RESET}" if ms >= 5_000: return f"{YELLOW}{ms:,.0f}ms{RESET}" return f"{GREEN}{ms:,.0f}ms{RESET}" def header(title: str) -> str: return f"\n{BOLD}{CYAN}{'─' * 60}\n {title}\n{'─' * 60}{RESET}" # ── Resolve log file ──────────────────────────────────────────────────────── def find_latest_log() -> str: """Find the most recent .jsonl file in cw-api-logs/.""" log_dir = os.path.join(os.getcwd(), "cw-api-logs") files = sorted(glob.glob(os.path.join(log_dir, "*.jsonl"))) if not files: print(f"{RED}No log files found in cw-api-logs/{RESET}") print(f"Run {BOLD}bun run dev:log{RESET} to start logging CW API calls.") sys.exit(1) return files[-1] if len(sys.argv) > 1: log_path = sys.argv[1] else: log_path = find_latest_log() print(f"{DIM}Reading: {log_path}{RESET}") entries = [] parse_errors = 0 with open(log_path) as f: for line in f: line = line.strip() if not line: continue try: entries.append(json.loads(line)) except json.JSONDecodeError: parse_errors += 1 if not entries: print("No entries found. Check the log file path.") sys.exit(1) # ── Derived fields ─────────────────────────────────────────────────────────── durations = [e["durationMs"] for e in entries] errors = [e for e in entries if e.get("error")] successes = [e for e in entries if not e.get("error")] timestamps = [datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) for e in entries] time_span = (timestamps[-1] - timestamps[0]) if len(timestamps) > 1 else timedelta(0) # Normalise the URL to a route pattern for grouping def normalise_url(url: str) -> str: parts = url.split("?")[0].rstrip("/").split("/") normalised = [] for p in parts: if p.isdigit(): normalised.append(":id") else: normalised.append(p) return "/".join(normalised) # ── 1. Overview ────────────────────────────────────────────────────────────── print(header("OVERVIEW")) print(f" Log file : {log_path}") print(f" Total calls : {BOLD}{len(entries):,}{RESET}") print(f" Successes : {GREEN}{len(successes):,}{RESET}") print(f" Failures : {RED}{len(errors):,}{RESET} ({len(errors)/len(entries)*100:.1f}%)") print(f" Time span : {time_span}") if time_span.total_seconds() > 0: rps = len(entries) / time_span.total_seconds() print(f" Avg req/sec : {rps:.2f}") if parse_errors: print(f" Parse errors : {YELLOW}{parse_errors}{RESET}") # ── 2. Duration stats ─────────────────────────────────────────────────────── print(header("DURATION STATS (all calls)")) sorted_dur = sorted(durations) p50 = sorted_dur[len(sorted_dur) * 50 // 100] p90 = sorted_dur[len(sorted_dur) * 90 // 100] p95 = sorted_dur[len(sorted_dur) * 95 // 100] p99 = sorted_dur[len(sorted_dur) * 99 // 100] print(f" Min : {colour_duration(min(durations))}") print(f" Max : {colour_duration(max(durations))}") print(f" Mean : {colour_duration(statistics.mean(durations))}") print(f" Median (p50) : {colour_duration(p50)}") print(f" p90 : {colour_duration(p90)}") print(f" p95 : {colour_duration(p95)}") print(f" p99 : {colour_duration(p99)}") print(f" Std dev : {statistics.stdev(durations):,.0f}ms" if len(durations) > 1 else "") # Duration buckets buckets = {"<500ms": 0, "500ms-1s": 0, "1-3s": 0, "3-5s": 0, "5-10s": 0, "10-20s": 0, "20s+": 0} for d in durations: if d < 500: buckets["<500ms"] += 1 elif d < 1000: buckets["500ms-1s"] += 1 elif d < 3000: buckets["1-3s"] += 1 elif d < 5000: buckets["3-5s"] += 1 elif d < 10000: buckets["5-10s"] += 1 elif d < 20000: buckets["10-20s"] += 1 else: buckets["20s+"] += 1 print(f"\n {BOLD}Distribution:{RESET}") max_bar = 40 max_count = max(buckets.values()) if buckets else 1 for label, count in buckets.items(): bar_len = int(count / max_count * max_bar) if max_count else 0 pct = count / len(durations) * 100 bar = "█" * bar_len clr = GREEN if "500" in label or "<" in label else (YELLOW if "1-3" in label or "3-5" in label else RED) print(f" {label:>10s} {clr}{bar}{RESET} {count:>5,} ({pct:5.1f}%)") # ── 3. Errors breakdown ───────────────────────────────────────────────────── print(header("ERROR BREAKDOWN")) if not errors: print(f" {GREEN}No errors! 🎉{RESET}") else: error_codes = Counter() for e in errors: err_str = e.get("error", "unknown") code = err_str.split(":")[0] if ":" in err_str else err_str error_codes[code] += 1 for code, count in error_codes.most_common(): print(f" {RED}{code:<30s}{RESET} {count:>5,} ({count/len(entries)*100:.1f}%)") # Errored URLs errored_urls = Counter(normalise_url(e["url"]) for e in errors) print(f"\n {BOLD}Top errored endpoints:{RESET}") for url, count in errored_urls.most_common(10): print(f" {count:>5,} {url}") # ── 4. Slowest individual calls ───────────────────────────────────────────── print(header("TOP 20 SLOWEST CALLS")) slowest = sorted(entries, key=lambda e: e["durationMs"], reverse=True)[:20] for i, e in enumerate(slowest, 1): status = e.get("status") or f"{RED}ERR{RESET}" err_tag = f" {RED}[{e['error'].split(':')[0]}]{RESET}" if e.get("error") else "" print(f" {i:>2}. {colour_duration(e['durationMs']):>20s} {e['method']:>4s} {e['url'][:60]:<60s} {DIM}{status}{RESET}{err_tag}") # ── 5. Per-endpoint stats ─────────────────────────────────────────────────── print(header("PER-ENDPOINT STATS (by route pattern)")) by_route = defaultdict(list) for e in entries: route = normalise_url(e["url"]) by_route[route].append(e) # Sort by total time spent descending (most impactful) route_stats = [] for route, calls in by_route.items(): durs = [c["durationMs"] for c in calls] errs = sum(1 for c in calls if c.get("error")) sorted_d = sorted(durs) route_stats.append({ "route": route, "count": len(calls), "errors": errs, "total_ms": sum(durs), "mean": statistics.mean(durs), "p50": sorted_d[len(sorted_d) * 50 // 100], "p95": sorted_d[len(sorted_d) * 95 // 100], "max": max(durs), }) route_stats.sort(key=lambda r: r["total_ms"], reverse=True) print(f" {'Route':<55s} {'Count':>6s} {'Errs':>5s} {'Mean':>8s} {'p50':>8s} {'p95':>8s} {'Max':>8s} {'Total':>10s}") print(f" {'─' * 55} {'─' * 6} {'─' * 5} {'─' * 8} {'─' * 8} {'─' * 8} {'─' * 8} {'─' * 10}") for r in route_stats[:25]: err_str = f"{RED}{r['errors']}{RESET}" if r['errors'] else f"{DIM}0{RESET}" print( f" {r['route']:<55s} {r['count']:>6,} {err_str:>14s} " f"{r['mean']:>7,.0f}ms {r['p50']:>7,.0f}ms {r['p95']:>7,.0f}ms " f"{r['max']:>7,.0f}ms {r['total_ms']/1000:>8,.1f}s" ) # ── 6. HTTP method breakdown ──────────────────────────────────────────────── print(header("BY HTTP METHOD")) by_method = defaultdict(list) for e in entries: by_method[e["method"]].append(e["durationMs"]) print(f" {'Method':<8s} {'Count':>7s} {'Mean':>9s} {'p95':>9s} {'Max':>9s}") print(f" {'─' * 8} {'─' * 7} {'─' * 9} {'─' * 9} {'─' * 9}") for method in sorted(by_method.keys()): durs = by_method[method] sd = sorted(durs) print( f" {method:<8s} {len(durs):>7,} " f"{statistics.mean(durs):>8,.0f}ms " f"{sd[len(sd)*95//100]:>8,.0f}ms " f"{max(durs):>8,.0f}ms" ) # ── 7. Timeline (calls per minute) ────────────────────────────────────────── if time_span.total_seconds() > 60: print(header("TIMELINE (per-minute throughput & errors)")) by_minute = defaultdict(lambda: {"count": 0, "errors": 0, "dur_sum": 0}) for e in entries: ts = e["timestamp"][:16] # YYYY-MM-DDTHH:MM by_minute[ts]["count"] += 1 by_minute[ts]["dur_sum"] += e["durationMs"] if e.get("error"): by_minute[ts]["errors"] += 1 for minute in sorted(by_minute.keys()): m = by_minute[minute] avg = m["dur_sum"] / m["count"] if m["count"] else 0 err_part = f" {RED}({m['errors']} errs){RESET}" if m["errors"] else "" bar = "▓" * min(m["count"] // 5, 50) avg_clr = colour_duration(avg) print(f" {minute} {m['count']:>5,} reqs avg {avg_clr:>20s} {bar}{err_part}") # ── 8. Concurrency hotspots ───────────────────────────────────────────────── print(header("CONCURRENCY HOTSPOTS (calls starting within 100ms of each other)")) ts_ms = [int(t.timestamp() * 1000) for t in timestamps] bursts = [] i = 0 while i < len(ts_ms): j = i while j < len(ts_ms) - 1 and ts_ms[j + 1] - ts_ms[j] < 100: j += 1 burst_size = j - i + 1 if burst_size >= 5: burst_entries = entries[i:j + 1] avg_dur = statistics.mean(e["durationMs"] for e in burst_entries) bursts.append((burst_size, entries[i]["timestamp"], avg_dur, burst_entries)) i = j + 1 bursts.sort(key=lambda b: b[0], reverse=True) if bursts: print(f" Found {len(bursts)} burst(s) of ≥5 concurrent requests\n") for size, ts, avg, _ in bursts[:10]: print(f" {YELLOW}{size:>3} concurrent{RESET} at {ts} avg {colour_duration(avg)}") else: print(f" {GREEN}No major concurrency bursts detected.{RESET}") # ── 9. Summary / recommendations ──────────────────────────────────────────── print(header("SUMMARY")) err_rate = len(errors) / len(entries) * 100 slow_5s = sum(1 for d in durations if d >= 5000) slow_pct = slow_5s / len(entries) * 100 if err_rate > 5: print(f" {RED}⚠ Error rate is {err_rate:.1f}% — CW API is struggling{RESET}") elif err_rate > 1: print(f" {YELLOW}⚠ Error rate is {err_rate:.1f}% — some instability{RESET}") else: print(f" {GREEN}✓ Error rate is {err_rate:.1f}% — acceptable{RESET}") if slow_pct > 10: print(f" {RED}⚠ {slow_5s:,} calls ({slow_pct:.1f}%) took >5s — CW is slow or rate-limiting{RESET}") elif slow_pct > 2: print(f" {YELLOW}⚠ {slow_5s:,} calls ({slow_pct:.1f}%) took >5s{RESET}") else: print(f" {GREEN}✓ Only {slow_5s:,} calls ({slow_pct:.1f}%) over 5s{RESET}") if bursts: max_burst = max(b[0] for b in bursts) print(f" {YELLOW}⚠ Max concurrency burst: {max_burst} simultaneous requests — consider lowering CONCURRENCY{RESET}") total_time_s = sum(durations) / 1000 print(f"\n Total wall-clock time spent waiting on CW: {BOLD}{total_time_s:,.1f}s{RESET} ({total_time_s/60:,.1f} min)") print()