#!/usr/bin/env python3 import argparse import json from collections import Counter, defaultdict from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any def parse_iso(value: str | None) -> datetime | None: if not value: return None normalized = value.replace("Z", "+00:00") try: return datetime.fromisoformat(normalized) except ValueError: return None def first_non_empty(*values: Any) -> str: for value in values: if value is None: continue if isinstance(value, str) and value.strip() == "": continue return str(value) return "unknown" def top_lines(counter: Counter[str], limit: int) -> list[str]: return [f"{k}: {v}" for k, v in counter.most_common(limit)] def fmt_pct(part: int, total: int) -> str: if total == 0: return "0.0%" return f"{(part / total) * 100:.1f}%" def human_duration(start: datetime | None, end: datetime | None) -> str: if start is None or end is None: return "unknown" delta = end - start total_seconds = int(delta.total_seconds()) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours}h {minutes}m {seconds}s" def truncate(value: str, max_len: int = 90) -> str: if len(value) <= max_len: return value return value[: max_len - 1] + "…" def add_section(lines: list[str], title: str) -> None: lines.append("") lines.append(title) lines.append("-" * len(title)) def supports_color(enabled: bool) -> bool: return enabled def paint(text: str, code: str, use_color: bool) -> str: if not use_color: return text return f"\033[{code}m{text}\033[0m" def good_bad_neutral(value: str, state: str, use_color: bool) -> str: if state == "good": return paint(value, "32", use_color) if state == "bad": return paint(value, "31", use_color) return paint(value, "36", use_color) def add_ranked_counter( lines: list[str], title: str, counter: Counter[str], top_n: int, total: int, truncate_labels: bool = False, ) -> None: lines.append(f"• {title}") items = counter.most_common(top_n) if not items: lines.append(" (no data)") return for index, (key, count) in enumerate(items, start=1): label = truncate(key) if truncate_labels else key lines.append(f" {index:>2}. {label:<90} {count:>4} {fmt_pct(count, total):>6}") def stream_row_summary(row: dict[str, Any], use_color: bool, max_path: int) -> str: request = row.get("request") or {} response = row.get("response") or {} body_parsed = request.get("bodyParsed") or {} entity_parsed = request.get("entityParsed") or {} summary = request.get("summary") or {} timestamp = parse_iso(row.get("timestamp")) time_label = timestamp.astimezone(timezone.utc).strftime("%H:%M:%S") if timestamp else "--:--:--" method = first_non_empty(request.get("method")) path = first_non_empty(request.get("path")) endpoint = path.split("?", 1)[0] status_code = first_non_empty(response.get("status")) event_type = first_non_empty(body_parsed.get("Type"), summary.get("type")) action = first_non_empty( body_parsed.get("Action"), summary.get("action"), request.get("query", {}).get("params", {}).get("action"), ) item_id = first_non_empty(body_parsed.get("ID"), summary.get("id"), request.get("query", {}).get("inferredId")) actor = first_non_empty( request.get("query", {}).get("params", {}).get("memberId"), summary.get("entityUpdatedBy"), entity_parsed.get("UpdatedBy"), ) entity_status = first_non_empty(entity_parsed.get("StatusName"), summary.get("entityStatus")) endpoint_label = truncate(endpoint, max_path) status_state = "good" if status_code.startswith("2") else "bad" status_colored = good_bad_neutral(status_code, status_state, use_color) event_colored = paint(f"{event_type}.{action}", "36", use_color) endpoint_colored = paint(endpoint_label, "94", use_color) return ( f"[{time_label}] {method:<4} {endpoint_colored:<20} " f"{status_colored:>3} {event_colored:<22} " f"id={item_id:<7} actor={truncate(actor, 16):<16} status={truncate(entity_status, 22)}" ) def endpoint_stream_summary(log_path: Path, use_color: bool, max_path: int) -> str: lines: list[str] = [] lines.append(paint("ENDPOINT STREAM (chronological)", "1;95", use_color)) lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color)) count = 0 invalid = 0 with log_path.open("r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: invalid += 1 continue lines.append(stream_row_summary(row, use_color=use_color, max_path=max_path)) count += 1 lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color)) lines.append( f"events={count} invalid={good_bad_neutral(str(invalid), 'good' if invalid == 0 else 'bad', use_color)}" ) return "\n".join(lines) @dataclass class LogStats: total_rows: int = 0 parsed_rows: int = 0 invalid_rows: int = 0 earliest: datetime | None = None latest: datetime | None = None methods: Counter[str] = None # type: ignore[assignment] paths: Counter[str] = None # type: ignore[assignment] endpoint_roots: Counter[str] = None # type: ignore[assignment] response_statuses: Counter[str] = None # type: ignore[assignment] event_types: Counter[str] = None # type: ignore[assignment] actions: Counter[str] = None # type: ignore[assignment] type_action_combo: Counter[str] = None # type: ignore[assignment] company_ids: Counter[str] = None # type: ignore[assignment] source_members: Counter[str] = None # type: ignore[assignment] actor_members: Counter[str] = None # type: ignore[assignment] entity_updated_by: Counter[str] = None # type: ignore[assignment] requests_by_hour: Counter[str] = None # type: ignore[assignment] requests_by_minute: Counter[str] = None # type: ignore[assignment] endpoint_by_hour: dict[str, Counter[str]] = None # type: ignore[assignment] def __post_init__(self) -> None: self.methods = Counter() self.paths = Counter() self.endpoint_roots = Counter() self.response_statuses = Counter() self.event_types = Counter() self.actions = Counter() self.type_action_combo = Counter() self.company_ids = Counter() self.source_members = Counter() self.actor_members = Counter() self.entity_updated_by = Counter() self.requests_by_hour = Counter() self.requests_by_minute = Counter() self.endpoint_by_hour = defaultdict(Counter) def add_timestamp(self, timestamp: datetime | None) -> None: if timestamp is None: return self.earliest = timestamp if self.earliest is None else min(self.earliest, timestamp) self.latest = timestamp if self.latest is None else max(self.latest, timestamp) hour_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC") minute_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") self.requests_by_hour[hour_bucket] += 1 self.requests_by_minute[minute_bucket] += 1 def summarize(self, top_n: int, busiest_n: int, use_color: bool) -> str: duration_line = human_duration(self.earliest, self.latest) time_range_line = "unknown" if self.earliest and self.latest: time_range_line = f"{self.earliest.isoformat()} → {self.latest.isoformat()}" total_requests = self.parsed_rows success_count = self.response_statuses.get("200", 0) success_pct = fmt_pct(success_count, sum(self.response_statuses.values())) invalid_state = "good" if self.invalid_rows == 0 else "bad" top_endpoints = self.endpoint_roots.most_common(2) top_users = self.actor_members.most_common(3) top_minutes = self.requests_by_minute.most_common(busiest_n) lines: list[str] = [] lines.append(paint("WEBHOOK SNAPSHOT", "1;95", use_color)) lines.append(paint("────────────────────────────────────────────────────────", "90", use_color)) lines.append( " " + paint("Rows", "1;97", use_color) + f": {self.total_rows:<4} " + paint("Parsed", "1;97", use_color) + f": {self.parsed_rows:<4} " + paint("Invalid", "1;97", use_color) + f": {good_bad_neutral(str(self.invalid_rows), invalid_state, use_color)}" ) lines.append( " " + paint("Window", "1;97", use_color) + f": {duration_line:<12} " + paint("Success", "1;97", use_color) + f": {good_bad_neutral(success_pct, 'good' if success_count else 'neutral', use_color)}" ) lines.append(" " + paint("UTC Range", "1;97", use_color) + f": {time_range_line}") lines.append("") lines.append(paint("Top Endpoints", "1;94", use_color)) if top_endpoints: for endpoint, count in top_endpoints: lines.append(f" • {endpoint:<14} {count:>4} ({fmt_pct(count, total_requests)})") if not top_endpoints: lines.append(" • (no data)") lines.append("") lines.append(paint("Most Active Users (query memberId)", "1;94", use_color)) if top_users: for user, count in top_users: lines.append(f" • {user:<18} {count:>4} ({fmt_pct(count, total_requests)})") if not top_users: lines.append(" • (no data)") lines.append("") lines.append(paint("Busiest Minutes", "1;94", use_color)) if top_minutes: for minute, count in top_minutes: lines.append(f" • {minute:<22} {count:>3}") if not top_minutes: lines.append(" • (no data)") lines.append("") lines.append(paint("Request Mix", "1;94", use_color)) method_line = ", ".join([f"{k}:{v}" for k, v in self.methods.most_common(3)]) or "(no data)" event_line = ", ".join([f"{k}:{v}" for k, v in self.event_types.most_common(3)]) or "(no data)" action_line = ", ".join([f"{k}:{v}" for k, v in self.actions.most_common(3)]) or "(no data)" lines.append(f" • Methods : {method_line}") lines.append(f" • Types : {event_line}") lines.append(f" • Actions : {action_line}") lines.append("") lines.append(paint("Status Codes", "1;94", use_color)) if self.response_statuses: status_total = sum(self.response_statuses.values()) for status, count in self.response_statuses.most_common(5): state = "good" if status.startswith("2") else "bad" status_label = good_bad_neutral(status, state, use_color) lines.append(f" • {status_label}: {count} ({fmt_pct(count, status_total)})") if not self.response_statuses: lines.append(" • (no data)") return "\n".join(lines) def update_stats(stats: LogStats, row: dict[str, Any]) -> None: timestamp = parse_iso(row.get("timestamp")) stats.add_timestamp(timestamp) request = row.get("request") or {} response = row.get("response") or {} body_parsed = request.get("bodyParsed") or {} entity_parsed = request.get("entityParsed") or {} method = first_non_empty(request.get("method")) path = first_non_empty(request.get("path")) endpoint_root = path.split("?", 1)[0] status = first_non_empty(response.get("status")) event_type = first_non_empty( body_parsed.get("Type"), request.get("summary", {}).get("type"), ) action = first_non_empty( body_parsed.get("Action"), request.get("summary", {}).get("action"), request.get("query", {}).get("params", {}).get("action"), ) combo = f"{event_type}:{action}" source_member = first_non_empty( body_parsed.get("MemberId"), request.get("summary", {}).get("memberId"), ) actor_member = first_non_empty( request.get("query", {}).get("params", {}).get("memberId"), request.get("summary", {}).get("entityUpdatedBy"), ) updated_by = first_non_empty( entity_parsed.get("UpdatedBy"), request.get("summary", {}).get("entityUpdatedBy"), ) company_id = first_non_empty(body_parsed.get("CompanyId"), request.get("headers", {}).get("companyname")) stats.methods[method] += 1 stats.paths[path] += 1 stats.endpoint_roots[endpoint_root] += 1 stats.response_statuses[status] += 1 stats.event_types[event_type] += 1 stats.actions[action] += 1 stats.type_action_combo[combo] += 1 stats.company_ids[company_id] += 1 stats.source_members[source_member] += 1 stats.actor_members[actor_member] += 1 stats.entity_updated_by[updated_by] += 1 if timestamp: bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC") stats.endpoint_by_hour[endpoint_root][bucket] += 1 def analyze_file(log_path: Path) -> LogStats: stats = LogStats() with log_path.open("r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue stats.total_rows += 1 try: row = json.loads(line) except json.JSONDecodeError: stats.invalid_rows += 1 continue stats.parsed_rows += 1 update_stats(stats, row) return stats def main() -> None: parser = argparse.ArgumentParser( description="Analyze webhook JSONL logs by users, time, and request types." ) parser.add_argument("log_file", help="Path to JSONL log file") parser.add_argument("--top", type=int, default=10, help="Top N entries per section (default: 10)") parser.add_argument( "--busiest-minutes", type=int, default=5, help="How many top minute buckets to show (default: 5)", ) parser.add_argument( "--no-color", action="store_true", help="Disable ANSI colors", ) parser.add_argument( "--endpoint-stream", action="store_true", help="Show chronological one-line summary per webhook, similar to live test webserver logs", ) parser.add_argument( "--max-path", type=int, default=18, help="Max endpoint width in stream mode before truncation (default: 18)", ) args = parser.parse_args() log_path = Path(args.log_file) if not log_path.exists() or not log_path.is_file(): raise SystemExit(f"Log file not found: {log_path}") use_color = supports_color(not args.no_color) if args.endpoint_stream: print(endpoint_stream_summary(log_path, use_color=use_color, max_path=max(args.max_path, 10))) return stats = analyze_file(log_path) print( stats.summarize( top_n=max(args.top, 1), busiest_n=max(args.busiest_minutes, 1), use_color=use_color, ) ) if __name__ == "__main__": main()