Files
optima/debug-scripts/analyze_webhook_log.py
T

442 lines
16 KiB
Python

#!/usr/bin/env python3
import argparse
import json
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
def parse_iso(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str) and value.strip() == "":
continue
return str(value)
return "unknown"
def top_lines(counter: Counter[str], limit: int) -> list[str]:
return [f"{k}: {v}" for k, v in counter.most_common(limit)]
def fmt_pct(part: int, total: int) -> str:
if total == 0:
return "0.0%"
return f"{(part / total) * 100:.1f}%"
def human_duration(start: datetime | None, end: datetime | None) -> str:
if start is None or end is None:
return "unknown"
delta = end - start
total_seconds = int(delta.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours}h {minutes}m {seconds}s"
def truncate(value: str, max_len: int = 90) -> str:
if len(value) <= max_len:
return value
return value[: max_len - 1] + ""
def add_section(lines: list[str], title: str) -> None:
lines.append("")
lines.append(title)
lines.append("-" * len(title))
def supports_color(enabled: bool) -> bool:
return enabled
def paint(text: str, code: str, use_color: bool) -> str:
if not use_color:
return text
return f"\033[{code}m{text}\033[0m"
def good_bad_neutral(value: str, state: str, use_color: bool) -> str:
if state == "good":
return paint(value, "32", use_color)
if state == "bad":
return paint(value, "31", use_color)
return paint(value, "36", use_color)
def add_ranked_counter(
lines: list[str],
title: str,
counter: Counter[str],
top_n: int,
total: int,
truncate_labels: bool = False,
) -> None:
lines.append(f"{title}")
items = counter.most_common(top_n)
if not items:
lines.append(" (no data)")
return
for index, (key, count) in enumerate(items, start=1):
label = truncate(key) if truncate_labels else key
lines.append(f" {index:>2}. {label:<90} {count:>4} {fmt_pct(count, total):>6}")
def stream_row_summary(row: dict[str, Any], use_color: bool, max_path: int) -> str:
request = row.get("request") or {}
response = row.get("response") or {}
body_parsed = request.get("bodyParsed") or {}
entity_parsed = request.get("entityParsed") or {}
summary = request.get("summary") or {}
timestamp = parse_iso(row.get("timestamp"))
time_label = timestamp.astimezone(timezone.utc).strftime("%H:%M:%S") if timestamp else "--:--:--"
method = first_non_empty(request.get("method"))
path = first_non_empty(request.get("path"))
endpoint = path.split("?", 1)[0]
status_code = first_non_empty(response.get("status"))
event_type = first_non_empty(body_parsed.get("Type"), summary.get("type"))
action = first_non_empty(
body_parsed.get("Action"),
summary.get("action"),
request.get("query", {}).get("params", {}).get("action"),
)
item_id = first_non_empty(body_parsed.get("ID"), summary.get("id"), request.get("query", {}).get("inferredId"))
actor = first_non_empty(
request.get("query", {}).get("params", {}).get("memberId"),
summary.get("entityUpdatedBy"),
entity_parsed.get("UpdatedBy"),
)
entity_status = first_non_empty(entity_parsed.get("StatusName"), summary.get("entityStatus"))
endpoint_label = truncate(endpoint, max_path)
status_state = "good" if status_code.startswith("2") else "bad"
status_colored = good_bad_neutral(status_code, status_state, use_color)
event_colored = paint(f"{event_type}.{action}", "36", use_color)
endpoint_colored = paint(endpoint_label, "94", use_color)
return (
f"[{time_label}] {method:<4} {endpoint_colored:<20} "
f"{status_colored:>3} {event_colored:<22} "
f"id={item_id:<7} actor={truncate(actor, 16):<16} status={truncate(entity_status, 22)}"
)
def endpoint_stream_summary(log_path: Path, use_color: bool, max_path: int) -> str:
lines: list[str] = []
lines.append(paint("ENDPOINT STREAM (chronological)", "1;95", use_color))
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
count = 0
invalid = 0
with log_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
invalid += 1
continue
lines.append(stream_row_summary(row, use_color=use_color, max_path=max_path))
count += 1
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
lines.append(
f"events={count} invalid={good_bad_neutral(str(invalid), 'good' if invalid == 0 else 'bad', use_color)}"
)
return "\n".join(lines)
@dataclass
class LogStats:
total_rows: int = 0
parsed_rows: int = 0
invalid_rows: int = 0
earliest: datetime | None = None
latest: datetime | None = None
methods: Counter[str] = None # type: ignore[assignment]
paths: Counter[str] = None # type: ignore[assignment]
endpoint_roots: Counter[str] = None # type: ignore[assignment]
response_statuses: Counter[str] = None # type: ignore[assignment]
event_types: Counter[str] = None # type: ignore[assignment]
actions: Counter[str] = None # type: ignore[assignment]
type_action_combo: Counter[str] = None # type: ignore[assignment]
company_ids: Counter[str] = None # type: ignore[assignment]
source_members: Counter[str] = None # type: ignore[assignment]
actor_members: Counter[str] = None # type: ignore[assignment]
entity_updated_by: Counter[str] = None # type: ignore[assignment]
requests_by_hour: Counter[str] = None # type: ignore[assignment]
requests_by_minute: Counter[str] = None # type: ignore[assignment]
endpoint_by_hour: dict[str, Counter[str]] = None # type: ignore[assignment]
def __post_init__(self) -> None:
self.methods = Counter()
self.paths = Counter()
self.endpoint_roots = Counter()
self.response_statuses = Counter()
self.event_types = Counter()
self.actions = Counter()
self.type_action_combo = Counter()
self.company_ids = Counter()
self.source_members = Counter()
self.actor_members = Counter()
self.entity_updated_by = Counter()
self.requests_by_hour = Counter()
self.requests_by_minute = Counter()
self.endpoint_by_hour = defaultdict(Counter)
def add_timestamp(self, timestamp: datetime | None) -> None:
if timestamp is None:
return
self.earliest = timestamp if self.earliest is None else min(self.earliest, timestamp)
self.latest = timestamp if self.latest is None else max(self.latest, timestamp)
hour_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
minute_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
self.requests_by_hour[hour_bucket] += 1
self.requests_by_minute[minute_bucket] += 1
def summarize(self, top_n: int, busiest_n: int, use_color: bool) -> str:
duration_line = human_duration(self.earliest, self.latest)
time_range_line = "unknown"
if self.earliest and self.latest:
time_range_line = f"{self.earliest.isoformat()}{self.latest.isoformat()}"
total_requests = self.parsed_rows
success_count = self.response_statuses.get("200", 0)
success_pct = fmt_pct(success_count, sum(self.response_statuses.values()))
invalid_state = "good" if self.invalid_rows == 0 else "bad"
top_endpoints = self.endpoint_roots.most_common(2)
top_users = self.actor_members.most_common(3)
top_minutes = self.requests_by_minute.most_common(busiest_n)
lines: list[str] = []
lines.append(paint("WEBHOOK SNAPSHOT", "1;95", use_color))
lines.append(paint("────────────────────────────────────────────────────────", "90", use_color))
lines.append(
" "
+ paint("Rows", "1;97", use_color)
+ f": {self.total_rows:<4} "
+ paint("Parsed", "1;97", use_color)
+ f": {self.parsed_rows:<4} "
+ paint("Invalid", "1;97", use_color)
+ f": {good_bad_neutral(str(self.invalid_rows), invalid_state, use_color)}"
)
lines.append(
" "
+ paint("Window", "1;97", use_color)
+ f": {duration_line:<12} "
+ paint("Success", "1;97", use_color)
+ f": {good_bad_neutral(success_pct, 'good' if success_count else 'neutral', use_color)}"
)
lines.append(" " + paint("UTC Range", "1;97", use_color) + f": {time_range_line}")
lines.append("")
lines.append(paint("Top Endpoints", "1;94", use_color))
if top_endpoints:
for endpoint, count in top_endpoints:
lines.append(f"{endpoint:<14} {count:>4} ({fmt_pct(count, total_requests)})")
if not top_endpoints:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Most Active Users (query memberId)", "1;94", use_color))
if top_users:
for user, count in top_users:
lines.append(f"{user:<18} {count:>4} ({fmt_pct(count, total_requests)})")
if not top_users:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Busiest Minutes", "1;94", use_color))
if top_minutes:
for minute, count in top_minutes:
lines.append(f"{minute:<22} {count:>3}")
if not top_minutes:
lines.append(" • (no data)")
lines.append("")
lines.append(paint("Request Mix", "1;94", use_color))
method_line = ", ".join([f"{k}:{v}" for k, v in self.methods.most_common(3)]) or "(no data)"
event_line = ", ".join([f"{k}:{v}" for k, v in self.event_types.most_common(3)]) or "(no data)"
action_line = ", ".join([f"{k}:{v}" for k, v in self.actions.most_common(3)]) or "(no data)"
lines.append(f" • Methods : {method_line}")
lines.append(f" • Types : {event_line}")
lines.append(f" • Actions : {action_line}")
lines.append("")
lines.append(paint("Status Codes", "1;94", use_color))
if self.response_statuses:
status_total = sum(self.response_statuses.values())
for status, count in self.response_statuses.most_common(5):
state = "good" if status.startswith("2") else "bad"
status_label = good_bad_neutral(status, state, use_color)
lines.append(f"{status_label}: {count} ({fmt_pct(count, status_total)})")
if not self.response_statuses:
lines.append(" • (no data)")
return "\n".join(lines)
def update_stats(stats: LogStats, row: dict[str, Any]) -> None:
timestamp = parse_iso(row.get("timestamp"))
stats.add_timestamp(timestamp)
request = row.get("request") or {}
response = row.get("response") or {}
body_parsed = request.get("bodyParsed") or {}
entity_parsed = request.get("entityParsed") or {}
method = first_non_empty(request.get("method"))
path = first_non_empty(request.get("path"))
endpoint_root = path.split("?", 1)[0]
status = first_non_empty(response.get("status"))
event_type = first_non_empty(
body_parsed.get("Type"),
request.get("summary", {}).get("type"),
)
action = first_non_empty(
body_parsed.get("Action"),
request.get("summary", {}).get("action"),
request.get("query", {}).get("params", {}).get("action"),
)
combo = f"{event_type}:{action}"
source_member = first_non_empty(
body_parsed.get("MemberId"),
request.get("summary", {}).get("memberId"),
)
actor_member = first_non_empty(
request.get("query", {}).get("params", {}).get("memberId"),
request.get("summary", {}).get("entityUpdatedBy"),
)
updated_by = first_non_empty(
entity_parsed.get("UpdatedBy"),
request.get("summary", {}).get("entityUpdatedBy"),
)
company_id = first_non_empty(body_parsed.get("CompanyId"), request.get("headers", {}).get("companyname"))
stats.methods[method] += 1
stats.paths[path] += 1
stats.endpoint_roots[endpoint_root] += 1
stats.response_statuses[status] += 1
stats.event_types[event_type] += 1
stats.actions[action] += 1
stats.type_action_combo[combo] += 1
stats.company_ids[company_id] += 1
stats.source_members[source_member] += 1
stats.actor_members[actor_member] += 1
stats.entity_updated_by[updated_by] += 1
if timestamp:
bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
stats.endpoint_by_hour[endpoint_root][bucket] += 1
def analyze_file(log_path: Path) -> LogStats:
stats = LogStats()
with log_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
stats.total_rows += 1
try:
row = json.loads(line)
except json.JSONDecodeError:
stats.invalid_rows += 1
continue
stats.parsed_rows += 1
update_stats(stats, row)
return stats
def main() -> None:
parser = argparse.ArgumentParser(
description="Analyze webhook JSONL logs by users, time, and request types."
)
parser.add_argument("log_file", help="Path to JSONL log file")
parser.add_argument("--top", type=int, default=10, help="Top N entries per section (default: 10)")
parser.add_argument(
"--busiest-minutes",
type=int,
default=5,
help="How many top minute buckets to show (default: 5)",
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable ANSI colors",
)
parser.add_argument(
"--endpoint-stream",
action="store_true",
help="Show chronological one-line summary per webhook, similar to live test webserver logs",
)
parser.add_argument(
"--max-path",
type=int,
default=18,
help="Max endpoint width in stream mode before truncation (default: 18)",
)
args = parser.parse_args()
log_path = Path(args.log_file)
if not log_path.exists() or not log_path.is_file():
raise SystemExit(f"Log file not found: {log_path}")
use_color = supports_color(not args.no_color)
if args.endpoint_stream:
print(endpoint_stream_summary(log_path, use_color=use_color, max_path=max(args.max_path, 10)))
return
stats = analyze_file(log_path)
print(
stats.summarize(
top_n=max(args.top, 1),
busiest_n=max(args.busiest_minutes, 1),
use_color=use_color,
)
)
if __name__ == "__main__":
main()