442 lines
16 KiB
Python
442 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def parse_iso(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
normalized = value.replace("Z", "+00:00")
|
|
try:
|
|
return datetime.fromisoformat(normalized)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def first_non_empty(*values: Any) -> str:
|
|
for value in values:
|
|
if value is None:
|
|
continue
|
|
if isinstance(value, str) and value.strip() == "":
|
|
continue
|
|
return str(value)
|
|
return "unknown"
|
|
|
|
|
|
def top_lines(counter: Counter[str], limit: int) -> list[str]:
|
|
return [f"{k}: {v}" for k, v in counter.most_common(limit)]
|
|
|
|
|
|
def fmt_pct(part: int, total: int) -> str:
|
|
if total == 0:
|
|
return "0.0%"
|
|
return f"{(part / total) * 100:.1f}%"
|
|
|
|
|
|
def human_duration(start: datetime | None, end: datetime | None) -> str:
|
|
if start is None or end is None:
|
|
return "unknown"
|
|
|
|
delta = end - start
|
|
total_seconds = int(delta.total_seconds())
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
return f"{hours}h {minutes}m {seconds}s"
|
|
|
|
|
|
def truncate(value: str, max_len: int = 90) -> str:
|
|
if len(value) <= max_len:
|
|
return value
|
|
return value[: max_len - 1] + "…"
|
|
|
|
|
|
def add_section(lines: list[str], title: str) -> None:
|
|
lines.append("")
|
|
lines.append(title)
|
|
lines.append("-" * len(title))
|
|
|
|
|
|
def supports_color(enabled: bool) -> bool:
|
|
return enabled
|
|
|
|
|
|
def paint(text: str, code: str, use_color: bool) -> str:
|
|
if not use_color:
|
|
return text
|
|
return f"\033[{code}m{text}\033[0m"
|
|
|
|
|
|
def good_bad_neutral(value: str, state: str, use_color: bool) -> str:
|
|
if state == "good":
|
|
return paint(value, "32", use_color)
|
|
if state == "bad":
|
|
return paint(value, "31", use_color)
|
|
return paint(value, "36", use_color)
|
|
|
|
|
|
def add_ranked_counter(
|
|
lines: list[str],
|
|
title: str,
|
|
counter: Counter[str],
|
|
top_n: int,
|
|
total: int,
|
|
truncate_labels: bool = False,
|
|
) -> None:
|
|
lines.append(f"• {title}")
|
|
items = counter.most_common(top_n)
|
|
if not items:
|
|
lines.append(" (no data)")
|
|
return
|
|
|
|
for index, (key, count) in enumerate(items, start=1):
|
|
label = truncate(key) if truncate_labels else key
|
|
lines.append(f" {index:>2}. {label:<90} {count:>4} {fmt_pct(count, total):>6}")
|
|
|
|
|
|
def stream_row_summary(row: dict[str, Any], use_color: bool, max_path: int) -> str:
|
|
request = row.get("request") or {}
|
|
response = row.get("response") or {}
|
|
body_parsed = request.get("bodyParsed") or {}
|
|
entity_parsed = request.get("entityParsed") or {}
|
|
summary = request.get("summary") or {}
|
|
|
|
timestamp = parse_iso(row.get("timestamp"))
|
|
time_label = timestamp.astimezone(timezone.utc).strftime("%H:%M:%S") if timestamp else "--:--:--"
|
|
|
|
method = first_non_empty(request.get("method"))
|
|
path = first_non_empty(request.get("path"))
|
|
endpoint = path.split("?", 1)[0]
|
|
status_code = first_non_empty(response.get("status"))
|
|
|
|
event_type = first_non_empty(body_parsed.get("Type"), summary.get("type"))
|
|
action = first_non_empty(
|
|
body_parsed.get("Action"),
|
|
summary.get("action"),
|
|
request.get("query", {}).get("params", {}).get("action"),
|
|
)
|
|
item_id = first_non_empty(body_parsed.get("ID"), summary.get("id"), request.get("query", {}).get("inferredId"))
|
|
actor = first_non_empty(
|
|
request.get("query", {}).get("params", {}).get("memberId"),
|
|
summary.get("entityUpdatedBy"),
|
|
entity_parsed.get("UpdatedBy"),
|
|
)
|
|
entity_status = first_non_empty(entity_parsed.get("StatusName"), summary.get("entityStatus"))
|
|
|
|
endpoint_label = truncate(endpoint, max_path)
|
|
status_state = "good" if status_code.startswith("2") else "bad"
|
|
status_colored = good_bad_neutral(status_code, status_state, use_color)
|
|
event_colored = paint(f"{event_type}.{action}", "36", use_color)
|
|
endpoint_colored = paint(endpoint_label, "94", use_color)
|
|
|
|
return (
|
|
f"[{time_label}] {method:<4} {endpoint_colored:<20} "
|
|
f"{status_colored:>3} {event_colored:<22} "
|
|
f"id={item_id:<7} actor={truncate(actor, 16):<16} status={truncate(entity_status, 22)}"
|
|
)
|
|
|
|
|
|
def endpoint_stream_summary(log_path: Path, use_color: bool, max_path: int) -> str:
|
|
lines: list[str] = []
|
|
lines.append(paint("ENDPOINT STREAM (chronological)", "1;95", use_color))
|
|
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
|
|
|
|
count = 0
|
|
invalid = 0
|
|
with log_path.open("r", encoding="utf-8") as handle:
|
|
for raw_line in handle:
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
invalid += 1
|
|
continue
|
|
|
|
lines.append(stream_row_summary(row, use_color=use_color, max_path=max_path))
|
|
count += 1
|
|
|
|
lines.append(paint("────────────────────────────────────────────────────────────────────────────────────────────", "90", use_color))
|
|
lines.append(
|
|
f"events={count} invalid={good_bad_neutral(str(invalid), 'good' if invalid == 0 else 'bad', use_color)}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
@dataclass
|
|
class LogStats:
|
|
total_rows: int = 0
|
|
parsed_rows: int = 0
|
|
invalid_rows: int = 0
|
|
earliest: datetime | None = None
|
|
latest: datetime | None = None
|
|
|
|
methods: Counter[str] = None # type: ignore[assignment]
|
|
paths: Counter[str] = None # type: ignore[assignment]
|
|
endpoint_roots: Counter[str] = None # type: ignore[assignment]
|
|
response_statuses: Counter[str] = None # type: ignore[assignment]
|
|
event_types: Counter[str] = None # type: ignore[assignment]
|
|
actions: Counter[str] = None # type: ignore[assignment]
|
|
type_action_combo: Counter[str] = None # type: ignore[assignment]
|
|
company_ids: Counter[str] = None # type: ignore[assignment]
|
|
|
|
source_members: Counter[str] = None # type: ignore[assignment]
|
|
actor_members: Counter[str] = None # type: ignore[assignment]
|
|
entity_updated_by: Counter[str] = None # type: ignore[assignment]
|
|
|
|
requests_by_hour: Counter[str] = None # type: ignore[assignment]
|
|
requests_by_minute: Counter[str] = None # type: ignore[assignment]
|
|
endpoint_by_hour: dict[str, Counter[str]] = None # type: ignore[assignment]
|
|
|
|
def __post_init__(self) -> None:
|
|
self.methods = Counter()
|
|
self.paths = Counter()
|
|
self.endpoint_roots = Counter()
|
|
self.response_statuses = Counter()
|
|
self.event_types = Counter()
|
|
self.actions = Counter()
|
|
self.type_action_combo = Counter()
|
|
self.company_ids = Counter()
|
|
|
|
self.source_members = Counter()
|
|
self.actor_members = Counter()
|
|
self.entity_updated_by = Counter()
|
|
|
|
self.requests_by_hour = Counter()
|
|
self.requests_by_minute = Counter()
|
|
self.endpoint_by_hour = defaultdict(Counter)
|
|
|
|
def add_timestamp(self, timestamp: datetime | None) -> None:
|
|
if timestamp is None:
|
|
return
|
|
|
|
self.earliest = timestamp if self.earliest is None else min(self.earliest, timestamp)
|
|
self.latest = timestamp if self.latest is None else max(self.latest, timestamp)
|
|
|
|
hour_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
|
|
minute_bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
|
self.requests_by_hour[hour_bucket] += 1
|
|
self.requests_by_minute[minute_bucket] += 1
|
|
|
|
def summarize(self, top_n: int, busiest_n: int, use_color: bool) -> str:
|
|
duration_line = human_duration(self.earliest, self.latest)
|
|
time_range_line = "unknown"
|
|
if self.earliest and self.latest:
|
|
time_range_line = f"{self.earliest.isoformat()} → {self.latest.isoformat()}"
|
|
|
|
total_requests = self.parsed_rows
|
|
success_count = self.response_statuses.get("200", 0)
|
|
success_pct = fmt_pct(success_count, sum(self.response_statuses.values()))
|
|
invalid_state = "good" if self.invalid_rows == 0 else "bad"
|
|
|
|
top_endpoints = self.endpoint_roots.most_common(2)
|
|
top_users = self.actor_members.most_common(3)
|
|
top_minutes = self.requests_by_minute.most_common(busiest_n)
|
|
|
|
lines: list[str] = []
|
|
lines.append(paint("WEBHOOK SNAPSHOT", "1;95", use_color))
|
|
lines.append(paint("────────────────────────────────────────────────────────", "90", use_color))
|
|
lines.append(
|
|
" "
|
|
+ paint("Rows", "1;97", use_color)
|
|
+ f": {self.total_rows:<4} "
|
|
+ paint("Parsed", "1;97", use_color)
|
|
+ f": {self.parsed_rows:<4} "
|
|
+ paint("Invalid", "1;97", use_color)
|
|
+ f": {good_bad_neutral(str(self.invalid_rows), invalid_state, use_color)}"
|
|
)
|
|
lines.append(
|
|
" "
|
|
+ paint("Window", "1;97", use_color)
|
|
+ f": {duration_line:<12} "
|
|
+ paint("Success", "1;97", use_color)
|
|
+ f": {good_bad_neutral(success_pct, 'good' if success_count else 'neutral', use_color)}"
|
|
)
|
|
lines.append(" " + paint("UTC Range", "1;97", use_color) + f": {time_range_line}")
|
|
|
|
lines.append("")
|
|
lines.append(paint("Top Endpoints", "1;94", use_color))
|
|
if top_endpoints:
|
|
for endpoint, count in top_endpoints:
|
|
lines.append(f" • {endpoint:<14} {count:>4} ({fmt_pct(count, total_requests)})")
|
|
if not top_endpoints:
|
|
lines.append(" • (no data)")
|
|
|
|
lines.append("")
|
|
lines.append(paint("Most Active Users (query memberId)", "1;94", use_color))
|
|
if top_users:
|
|
for user, count in top_users:
|
|
lines.append(f" • {user:<18} {count:>4} ({fmt_pct(count, total_requests)})")
|
|
if not top_users:
|
|
lines.append(" • (no data)")
|
|
|
|
lines.append("")
|
|
lines.append(paint("Busiest Minutes", "1;94", use_color))
|
|
if top_minutes:
|
|
for minute, count in top_minutes:
|
|
lines.append(f" • {minute:<22} {count:>3}")
|
|
if not top_minutes:
|
|
lines.append(" • (no data)")
|
|
|
|
lines.append("")
|
|
lines.append(paint("Request Mix", "1;94", use_color))
|
|
method_line = ", ".join([f"{k}:{v}" for k, v in self.methods.most_common(3)]) or "(no data)"
|
|
event_line = ", ".join([f"{k}:{v}" for k, v in self.event_types.most_common(3)]) or "(no data)"
|
|
action_line = ", ".join([f"{k}:{v}" for k, v in self.actions.most_common(3)]) or "(no data)"
|
|
lines.append(f" • Methods : {method_line}")
|
|
lines.append(f" • Types : {event_line}")
|
|
lines.append(f" • Actions : {action_line}")
|
|
|
|
lines.append("")
|
|
lines.append(paint("Status Codes", "1;94", use_color))
|
|
if self.response_statuses:
|
|
status_total = sum(self.response_statuses.values())
|
|
for status, count in self.response_statuses.most_common(5):
|
|
state = "good" if status.startswith("2") else "bad"
|
|
status_label = good_bad_neutral(status, state, use_color)
|
|
lines.append(f" • {status_label}: {count} ({fmt_pct(count, status_total)})")
|
|
if not self.response_statuses:
|
|
lines.append(" • (no data)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def update_stats(stats: LogStats, row: dict[str, Any]) -> None:
|
|
timestamp = parse_iso(row.get("timestamp"))
|
|
stats.add_timestamp(timestamp)
|
|
|
|
request = row.get("request") or {}
|
|
response = row.get("response") or {}
|
|
body_parsed = request.get("bodyParsed") or {}
|
|
entity_parsed = request.get("entityParsed") or {}
|
|
|
|
method = first_non_empty(request.get("method"))
|
|
path = first_non_empty(request.get("path"))
|
|
endpoint_root = path.split("?", 1)[0]
|
|
status = first_non_empty(response.get("status"))
|
|
|
|
event_type = first_non_empty(
|
|
body_parsed.get("Type"),
|
|
request.get("summary", {}).get("type"),
|
|
)
|
|
action = first_non_empty(
|
|
body_parsed.get("Action"),
|
|
request.get("summary", {}).get("action"),
|
|
request.get("query", {}).get("params", {}).get("action"),
|
|
)
|
|
combo = f"{event_type}:{action}"
|
|
|
|
source_member = first_non_empty(
|
|
body_parsed.get("MemberId"),
|
|
request.get("summary", {}).get("memberId"),
|
|
)
|
|
actor_member = first_non_empty(
|
|
request.get("query", {}).get("params", {}).get("memberId"),
|
|
request.get("summary", {}).get("entityUpdatedBy"),
|
|
)
|
|
updated_by = first_non_empty(
|
|
entity_parsed.get("UpdatedBy"),
|
|
request.get("summary", {}).get("entityUpdatedBy"),
|
|
)
|
|
company_id = first_non_empty(body_parsed.get("CompanyId"), request.get("headers", {}).get("companyname"))
|
|
|
|
stats.methods[method] += 1
|
|
stats.paths[path] += 1
|
|
stats.endpoint_roots[endpoint_root] += 1
|
|
stats.response_statuses[status] += 1
|
|
stats.event_types[event_type] += 1
|
|
stats.actions[action] += 1
|
|
stats.type_action_combo[combo] += 1
|
|
stats.company_ids[company_id] += 1
|
|
|
|
stats.source_members[source_member] += 1
|
|
stats.actor_members[actor_member] += 1
|
|
stats.entity_updated_by[updated_by] += 1
|
|
|
|
if timestamp:
|
|
bucket = timestamp.astimezone(timezone.utc).strftime("%Y-%m-%d %H:00 UTC")
|
|
stats.endpoint_by_hour[endpoint_root][bucket] += 1
|
|
|
|
|
|
def analyze_file(log_path: Path) -> LogStats:
|
|
stats = LogStats()
|
|
|
|
with log_path.open("r", encoding="utf-8") as handle:
|
|
for raw_line in handle:
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
stats.total_rows += 1
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
stats.invalid_rows += 1
|
|
continue
|
|
|
|
stats.parsed_rows += 1
|
|
update_stats(stats, row)
|
|
|
|
return stats
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze webhook JSONL logs by users, time, and request types."
|
|
)
|
|
parser.add_argument("log_file", help="Path to JSONL log file")
|
|
parser.add_argument("--top", type=int, default=10, help="Top N entries per section (default: 10)")
|
|
parser.add_argument(
|
|
"--busiest-minutes",
|
|
type=int,
|
|
default=5,
|
|
help="How many top minute buckets to show (default: 5)",
|
|
)
|
|
parser.add_argument(
|
|
"--no-color",
|
|
action="store_true",
|
|
help="Disable ANSI colors",
|
|
)
|
|
parser.add_argument(
|
|
"--endpoint-stream",
|
|
action="store_true",
|
|
help="Show chronological one-line summary per webhook, similar to live test webserver logs",
|
|
)
|
|
parser.add_argument(
|
|
"--max-path",
|
|
type=int,
|
|
default=18,
|
|
help="Max endpoint width in stream mode before truncation (default: 18)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
log_path = Path(args.log_file)
|
|
if not log_path.exists() or not log_path.is_file():
|
|
raise SystemExit(f"Log file not found: {log_path}")
|
|
|
|
use_color = supports_color(not args.no_color)
|
|
|
|
if args.endpoint_stream:
|
|
print(endpoint_stream_summary(log_path, use_color=use_color, max_path=max(args.max_path, 10)))
|
|
return
|
|
|
|
stats = analyze_file(log_path)
|
|
print(
|
|
stats.summarize(
|
|
top_n=max(args.top, 1),
|
|
busiest_n=max(args.busiest_minutes, 1),
|
|
use_color=use_color,
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|