#!/usr/bin/env python3 """ Hottub channel health checker. Usage: python check.py [channel_id] python check.py --url http://127.0.0.1:18080 spankbang python check.py --no-ytdlp # skip yt-dlp extraction python check.py --workers 8 # parallel channel workers python check.py -v # verbose Checks per channel: - /api/videos returns items - video.url resolves (follows localhost proxy redirects) - video.thumb resolves - video.formats[].url resolves with the declared http_headers - yt-dlp -J on video.url (page URLs only): title/duration must match - yt-dlp -J on each format.url that is not a direct media file """ import sys import json import subprocess import argparse import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional try: import requests except ImportError: sys.exit("requests is required: pip install requests") DEFAULT_BASE = "http://127.0.0.1:18080" VIDEOS_TO_SAMPLE = 3 YTDLP_TIMEOUT = 60 HTTP_TIMEOUT = 15 log = logging.getLogger("check") class Results: def __init__(self): self._lock = threading.Lock() self.errors: list[str] = [] self.warnings: list[str] = [] def err(self, channel: str, msg: str): entry = f"[{channel}] {msg}" log.error("[%s] %s", channel, msg) with self._lock: self.errors.append(entry) def warn(self, channel: str, msg: str): entry = f"[{channel}] {msg}" log.warning("[%s] %s", channel, msg) with self._lock: self.warnings.append(entry) def info(self, channel: str, msg: str): log.info("[%s] %s", channel, msg) _BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0" # Domains known to be Cloudflare-protected and return 403/connection-refused to direct # HTTP checks. URL reachability failures for these hosts are downgraded to warnings. _CF_PROTECTED_HOSTS = { "www.camsoda.com", "camsoda.com", } def _is_cf_protected(url: str) -> bool: """Return True if the URL's host is known to be CF-protected.""" try: from urllib.parse import urlparse host = urlparse(url).hostname or "" return host in _CF_PROTECTED_HOSTS except Exception: return False def http_ok(url: str, headers: dict | None = None) -> tuple[bool, int]: """Return (ok, http_status). Tries HEAD then ranged GET on 405.""" h = {"User-Agent": _BROWSER_UA, **(headers or {})} try: r = requests.head(url, headers=h, timeout=HTTP_TIMEOUT, allow_redirects=True) if r.status_code in (200, 206): return True, r.status_code if r.status_code == 405: r2 = requests.get( url, headers={**h, "Range": "bytes=0-1023"}, timeout=HTTP_TIMEOUT, stream=True, ) r2.close() if r2.status_code in (200, 206): return True, r2.status_code return False, r2.status_code return False, r.status_code except requests.exceptions.Timeout: return False, -1 # timeout except requests.exceptions.ConnectionError: return False, -2 # connection refused / DNS except Exception as e: log.debug("http_ok exception for %s: %s", url, e) return False, -3 def ytdlp_extract(url: str, extra_args: list[str] | None = None) -> tuple[Optional[dict], str]: """Run yt-dlp -j and return (parsed_info_or_None, stderr_text). -j outputs one JSON object per line; we take the first non-empty line. """ cmd = ( ["yt-dlp", "-j", "--no-warnings", "--socket-timeout", "20"] + (extra_args or []) + [url] ) try: proc = subprocess.run(cmd, capture_output=True, timeout=YTDLP_TIMEOUT) stderr = proc.stderr.decode("utf-8", errors="replace").strip() if proc.returncode != 0: return None, stderr first_line = next( (l for l in proc.stdout.splitlines() if l.strip()), b"" ) if not first_line: return None, "yt-dlp produced no output" info = json.loads(first_line) return info, stderr except subprocess.TimeoutExpired: return None, "yt-dlp timed out" except json.JSONDecodeError as e: return None, f"invalid JSON from yt-dlp: {e}" except FileNotFoundError: return None, "yt-dlp not found in PATH" except Exception as e: return None, str(e) # Headers yt-dlp always injects regardless of site — not meaningful to compare. _YTDLP_BUILTIN_HEADERS = frozenset( k.lower() for k in ( "User-Agent", "Accept", "Accept-Language", "Accept-Encoding", "Accept-Charset", "Connection", "Sec-Fetch-Mode", ) ) def compare_format_fields( api_fmt: dict, yt_fmt: dict, channel: str, label: str, results: Results, ): """Flag differences in ext, protocol, video_ext and http_headers.""" for field in ("ext", "protocol", "video_ext"): api_val = api_fmt.get(field) yt_val = yt_fmt.get(field) if api_val and yt_val and api_val != yt_val: results.warn( channel, f"{label}: {field} mismatch — api='{api_val}' yt-dlp='{yt_val}'", ) api_headers = {k.lower(): v for k, v in (api_fmt.get("http_headers") or {}).items()} yt_headers = {k.lower(): v for k, v in (yt_fmt.get("http_headers") or {}).items()} # Headers declared by the API that yt-dlp also knows about — values must match. for key, api_val in api_headers.items(): if key in _YTDLP_BUILTIN_HEADERS: continue if key not in yt_headers: results.warn(channel, f"{label}: http_header '{key}' in api but absent in yt-dlp") elif yt_headers[key] != api_val: results.warn( channel, f"{label}: http_header '{key}' mismatch — api='{api_val}' yt-dlp='{yt_headers[key]}'", ) # Non-builtin headers yt-dlp requires that the API does not declare. for key, yt_val in yt_headers.items(): if key in _YTDLP_BUILTIN_HEADERS or key in api_headers: continue results.warn(channel, f"{label}: http_header '{key}' required by yt-dlp but not declared in api") def is_page_url(url: str) -> bool: """True if the URL looks like a browseable page, not a media file or local proxy.""" if not url.startswith(("http://", "https://")): return False if "127.0.0.1" in url or "localhost" in url: return False path = url.lower().split("?")[0] for ext in (".mp4", ".m3u8", ".ts", ".webm", ".mkv", ".flv", ".avi", ".mov"): if path.endswith(ext): return False return True def is_media_file_url(url: str) -> bool: """True if the URL directly points to a media file (not a page, not HLS).""" path = url.lower().split("?")[0] for ext in (".mp4", ".webm", ".mkv", ".flv", ".avi", ".mov", ".ts"): if path.endswith(ext): return True return False def follow_proxy_redirect(url: str) -> str: """If url is a localhost proxy URL, follow one redirect to get the real URL.""" if "127.0.0.1" not in url and "localhost" not in url: return url try: r = requests.head(url, timeout=HTTP_TIMEOUT, allow_redirects=False) if r.status_code in (301, 302, 303, 307, 308): loc = r.headers.get("Location", "") if loc and "127.0.0.1" not in loc and "localhost" not in loc: return loc except Exception: pass return url def titles_match(a: str, b: str) -> bool: """Fuzzy title comparison: first 20 chars of one appears in the other.""" a, b = a.lower().strip(), b.lower().strip() if not a or not b: return True prefix_len = min(20, min(len(a), len(b))) return a[:prefix_len] in b or b[:prefix_len] in a def check_video(video: dict, channel_id: str, results: Results, run_ytdlp: bool): vid_id = video.get("id", "?") label = f"id={vid_id}" vurl = video.get("url", "") thumb = video.get("thumb", "") formats: list[dict] = video.get("formats") or [] # video.url must not point to the hottub server itself if not vurl: results.err(channel_id, f"{label}: missing url") elif "127.0.0.1" in vurl or "localhost" in vurl: results.err(channel_id, f"{label}: url points to hottub server: {vurl}") else: ok, code = http_ok(vurl) if not ok: if _is_cf_protected(vurl): results.warn(channel_id, f"{label}: url unreachable HTTP={code} (CF-protected host, expected): {vurl}") else: results.err(channel_id, f"{label}: url unreachable HTTP={code}: {vurl}") else: results.info(channel_id, f"{label}: url OK (HTTP {code})") # video.thumb if thumb: ok, code = http_ok(thumb) if not ok: results.err(channel_id, f"{label}: thumb unreachable HTTP={code}: {thumb}") else: results.info(channel_id, f"{label}: thumb OK (HTTP {code})") else: results.warn(channel_id, f"{label}: no thumb") # video.formats[].url with declared http_headers for j, fmt in enumerate(formats): furl = fmt.get("url", "") fheaders: dict[str, str] = fmt.get("http_headers") or {} if not furl: results.err(channel_id, f"{label} format[{j}]: missing url") continue ok, code = http_ok(furl, headers=fheaders) if not ok: results.err( channel_id, f"{label} format[{j}]: unreachable HTTP={code}" + (f" (headers={list(fheaders.keys())})" if fheaders else "") + f": {furl}", ) else: results.info(channel_id, f"{label} format[{j}]: OK (HTTP {code})") if not run_ytdlp: return # yt-dlp info extraction on video.url (page URLs only, skipped when formats are provided) ytdlp_url = vurl if vurl and "127.0.0.1" not in vurl and "localhost" not in vurl else "" if not formats and ytdlp_url and is_page_url(ytdlp_url): results.info(channel_id, f"{label}: yt-dlp extract {ytdlp_url}") yt, stderr = ytdlp_extract(ytdlp_url) if yt is None: if _is_cf_protected(ytdlp_url): results.warn( channel_id, f"{label}: yt-dlp failed for {ytdlp_url} (CF-protected host, expected)" + (f": {stderr[:200]}" if stderr else ""), ) else: results.err( channel_id, f"{label}: yt-dlp failed for {ytdlp_url}" + (f": {stderr[:300]}" if stderr else ""), ) else: yt_title = (yt.get("title") or "").strip() api_title = (video.get("title") or "").strip() yt_dur = yt.get("duration") api_dur = video.get("duration") or 0 if yt_dur is not None and api_dur and abs(float(yt_dur) - float(api_dur)) > 10: results.warn( channel_id, f"{label}: duration mismatch — yt-dlp={yt_dur}s api={api_dur}s", ) yt_fmts = yt.get("formats") or [] yt_direct = yt.get("url") if not yt_fmts and not yt_direct: results.err( channel_id, f"{label}: yt-dlp returned no formats/url for {ytdlp_url}", ) else: results.info( channel_id, f"{label}: yt-dlp OK — formats={len(yt_fmts)}" + (f" title='{yt_title[:50]}'" if yt_title else ""), ) # Compare each API format against the matching yt-dlp format by URL. yt_fmts_by_url = {f.get("url", ""): f for f in yt_fmts} # Also handle the single-format case where yt-dlp puts url at top level. if yt_direct and not yt_fmts: yt_fmts_by_url[yt_direct] = yt for j, api_fmt in enumerate(formats): furl = api_fmt.get("url", "") if not furl: continue yt_fmt = yt_fmts_by_url.get(furl) if yt_fmt is None: results.warn( channel_id, f"{label} format[{j}]: url not found in yt-dlp formats — {furl}", ) else: compare_format_fields(api_fmt, yt_fmt, channel_id, f"{label} format[{j}]", results) # yt-dlp on format URLs that are not direct media files for j, fmt in enumerate(formats): furl = fmt.get("url", "") if not furl or is_media_file_url(furl): continue fheaders: dict[str, str] = fmt.get("http_headers") or {} extra_args: list[str] = [] for hk, hv in fheaders.items(): extra_args += ["--add-header", f"{hk}:{hv}"] results.info(channel_id, f"{label} format[{j}]: yt-dlp extract {furl}") yt, stderr = ytdlp_extract(furl, extra_args=extra_args) if yt is None: results.err( channel_id, f"{label} format[{j}]: yt-dlp failed for {furl}" + (f": {stderr[:200]}" if stderr else ""), ) else: yt_fmts = yt.get("formats") or [] yt_direct = yt.get("url") if not yt_fmts and not yt_direct: results.err( channel_id, f"{label} format[{j}]: yt-dlp returned no formats/url for {furl}", ) else: results.info( channel_id, f"{label} format[{j}]: yt-dlp OK — formats={len(yt_fmts)}", ) def check_channel(channel_id: str, channel_name: str, base: str, results: Results, run_ytdlp: bool): results.info(channel_id, f"--- start '{channel_name}' ---") try: resp = requests.post( f"{base}/api/videos", json={ "channel": channel_id, "sort": "date", "page": "1", "perPage": str(VIDEOS_TO_SAMPLE), }, timeout=30, ) except Exception as e: results.err(channel_id, f"videos request failed: {e}") return if resp.status_code != 200: results.err(channel_id, f"videos endpoint returned HTTP {resp.status_code}") return try: data = resp.json() except Exception as e: results.err(channel_id, f"could not parse videos response: {e}") return items: list[dict] = data.get("items") or [] if not items: results.err(channel_id, "no items returned by /api/videos") return results.info(channel_id, f"{len(items)} item(s) returned") for video in items[:VIDEOS_TO_SAMPLE]: check_video(video, channel_id, results, run_ytdlp) def main(): parser = argparse.ArgumentParser(description="Hottub channel health checker") parser.add_argument("channel", nargs="?", help="single channel id to test") parser.add_argument("--url", default=DEFAULT_BASE, metavar="BASE_URL", help=f"server base URL (default: {DEFAULT_BASE})") parser.add_argument("--no-ytdlp", action="store_true", help="skip yt-dlp extraction checks") parser.add_argument("--workers", type=int, default=4, help="parallel channel workers (default: 4)") parser.add_argument("-v", "--verbose", action="store_true", help="show INFO log lines") args = parser.parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.WARNING, format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%H:%M:%S", ) # Always print ERRORs and WARNINGs; INFO only in verbose mode if not args.verbose: logging.getLogger("check").setLevel(logging.WARNING) else: logging.getLogger("check").setLevel(logging.INFO) base = args.url.rstrip("/") run_ytdlp = not args.no_ytdlp results = Results() if args.channel: # Single channel: skip status, go directly to the channel check channels = [{"id": args.channel, "name": args.channel}] else: # No channel specified: check status first, then all channels print(f"checking {base}/api/status ...") try: resp = requests.get(f"{base}/api/status", timeout=15) except Exception as e: print(f"ERROR: /api/status unreachable: {e}") sys.exit(1) if resp.status_code != 200: print(f"ERROR: /api/status returned HTTP {resp.status_code}") sys.exit(1) try: status_data = resp.json() except Exception as e: print(f"ERROR: could not parse /api/status response: {e}") sys.exit(1) channels: list[dict] = status_data.get("channels") or [] if not channels: print("ERROR: no channels in /api/status response") sys.exit(1) print(f"status OK — {len(channels)} channels") # --- per-channel checks --- if len(channels) == 1 or args.workers <= 1: for ch in channels: check_channel(ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp) else: with ThreadPoolExecutor(max_workers=args.workers) as pool: futs = { pool.submit(check_channel, ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp): ch for ch in channels } for fut in as_completed(futs): try: fut.result() except Exception as e: ch = futs[fut] results.err(ch["id"], f"unexpected exception: {e}") # --- summary --- print() total = len(results.errors) + len(results.warnings) if not results.errors and not results.warnings: print(f"All checks passed ({len(channels)} channel(s) tested).") sys.exit(0) if results.errors: print(f"=== {len(results.errors)} error(s) ===") for e in results.errors: print(f" {e}") if results.warnings: print(f"=== {len(results.warnings)} warning(s) ===") for w in results.warnings: print(f" {w}") sys.exit(1 if results.errors else 0) if __name__ == "__main__": main()