Files
hottub/check.py
2026-05-22 10:26:05 +00:00

521 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Hottub channel health checker.
Usage:
python check.py [channel_id]
python check.py --url http://127.0.0.1:18080 spankbang
python check.py --no-ytdlp # skip yt-dlp extraction
python check.py --workers 8 # parallel channel workers
python check.py -v # verbose
Checks per channel:
- /api/videos returns items
- video.url resolves (follows localhost proxy redirects)
- video.thumb resolves
- video.formats[].url resolves with the declared http_headers
- yt-dlp -J on video.url (page URLs only): title/duration must match
- yt-dlp -J on each format.url that is not a direct media file
"""
import sys
import json
import subprocess
import argparse
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional
try:
import requests
except ImportError:
sys.exit("requests is required: pip install requests")
DEFAULT_BASE = "http://127.0.0.1:18080"
VIDEOS_TO_SAMPLE = 3
YTDLP_TIMEOUT = 60
HTTP_TIMEOUT = 15
log = logging.getLogger("check")
class Results:
def __init__(self):
self._lock = threading.Lock()
self.errors: list[str] = []
self.warnings: list[str] = []
def err(self, channel: str, msg: str):
entry = f"[{channel}] {msg}"
log.error("[%s] %s", channel, msg)
with self._lock:
self.errors.append(entry)
def warn(self, channel: str, msg: str):
entry = f"[{channel}] {msg}"
log.warning("[%s] %s", channel, msg)
with self._lock:
self.warnings.append(entry)
def info(self, channel: str, msg: str):
log.info("[%s] %s", channel, msg)
_BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0"
# Domains known to be Cloudflare-protected and return 403/connection-refused to direct
# HTTP checks. URL reachability failures for these hosts are downgraded to warnings.
_CF_PROTECTED_HOSTS = {
"www.camsoda.com",
"camsoda.com",
}
def _is_cf_protected(url: str) -> bool:
"""Return True if the URL's host is known to be CF-protected."""
try:
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
return host in _CF_PROTECTED_HOSTS
except Exception:
return False
def http_ok(url: str, headers: dict | None = None) -> tuple[bool, int]:
"""Return (ok, http_status). Tries HEAD then ranged GET on 405."""
h = {"User-Agent": _BROWSER_UA, **(headers or {})}
try:
r = requests.head(url, headers=h, timeout=HTTP_TIMEOUT, allow_redirects=True)
if r.status_code in (200, 206):
return True, r.status_code
if r.status_code == 405:
r2 = requests.get(
url,
headers={**h, "Range": "bytes=0-1023"},
timeout=HTTP_TIMEOUT,
stream=True,
)
r2.close()
if r2.status_code in (200, 206):
return True, r2.status_code
return False, r2.status_code
return False, r.status_code
except requests.exceptions.Timeout:
return False, -1 # timeout
except requests.exceptions.ConnectionError:
return False, -2 # connection refused / DNS
except Exception as e:
log.debug("http_ok exception for %s: %s", url, e)
return False, -3
def ytdlp_extract(url: str, extra_args: list[str] | None = None) -> tuple[Optional[dict], str]:
"""Run yt-dlp -j and return (parsed_info_or_None, stderr_text).
-j outputs one JSON object per line; we take the first non-empty line.
"""
cmd = (
["yt-dlp", "-j", "--no-warnings", "--socket-timeout", "20"]
+ (extra_args or [])
+ [url]
)
try:
proc = subprocess.run(cmd, capture_output=True, timeout=YTDLP_TIMEOUT)
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
if proc.returncode != 0:
return None, stderr
first_line = next(
(l for l in proc.stdout.splitlines() if l.strip()), b""
)
if not first_line:
return None, "yt-dlp produced no output"
info = json.loads(first_line)
return info, stderr
except subprocess.TimeoutExpired:
return None, "yt-dlp timed out"
except json.JSONDecodeError as e:
return None, f"invalid JSON from yt-dlp: {e}"
except FileNotFoundError:
return None, "yt-dlp not found in PATH"
except Exception as e:
return None, str(e)
# Headers yt-dlp always injects regardless of site — not meaningful to compare.
_YTDLP_BUILTIN_HEADERS = frozenset(
k.lower() for k in (
"User-Agent", "Accept", "Accept-Language", "Accept-Encoding",
"Accept-Charset", "Connection", "Sec-Fetch-Mode",
)
)
def compare_format_fields(
api_fmt: dict,
yt_fmt: dict,
channel: str,
label: str,
results: Results,
):
"""Flag differences in ext, protocol, video_ext and http_headers."""
for field in ("ext", "protocol", "video_ext"):
api_val = api_fmt.get(field)
yt_val = yt_fmt.get(field)
if api_val and yt_val and api_val != yt_val:
results.warn(
channel,
f"{label}: {field} mismatch — api='{api_val}' yt-dlp='{yt_val}'",
)
api_headers = {k.lower(): v for k, v in (api_fmt.get("http_headers") or {}).items()}
yt_headers = {k.lower(): v for k, v in (yt_fmt.get("http_headers") or {}).items()}
# Headers declared by the API that yt-dlp also knows about — values must match.
for key, api_val in api_headers.items():
if key in _YTDLP_BUILTIN_HEADERS:
continue
if key not in yt_headers:
results.warn(channel, f"{label}: http_header '{key}' in api but absent in yt-dlp")
elif yt_headers[key] != api_val:
results.warn(
channel,
f"{label}: http_header '{key}' mismatch — api='{api_val}' yt-dlp='{yt_headers[key]}'",
)
# Non-builtin headers yt-dlp requires that the API does not declare.
for key, yt_val in yt_headers.items():
if key in _YTDLP_BUILTIN_HEADERS or key in api_headers:
continue
results.warn(channel, f"{label}: http_header '{key}' required by yt-dlp but not declared in api")
def is_page_url(url: str) -> bool:
"""True if the URL looks like a browseable page, not a media file or local proxy."""
if not url.startswith(("http://", "https://")):
return False
if "127.0.0.1" in url or "localhost" in url:
return False
path = url.lower().split("?")[0]
for ext in (".mp4", ".m3u8", ".ts", ".webm", ".mkv", ".flv", ".avi", ".mov"):
if path.endswith(ext):
return False
return True
def is_media_file_url(url: str) -> bool:
"""True if the URL directly points to a media file (not a page, not HLS)."""
path = url.lower().split("?")[0]
for ext in (".mp4", ".webm", ".mkv", ".flv", ".avi", ".mov", ".ts"):
if path.endswith(ext):
return True
return False
def follow_proxy_redirect(url: str) -> str:
"""If url is a localhost proxy URL, follow one redirect to get the real URL."""
if "127.0.0.1" not in url and "localhost" not in url:
return url
try:
r = requests.head(url, timeout=HTTP_TIMEOUT, allow_redirects=False)
if r.status_code in (301, 302, 303, 307, 308):
loc = r.headers.get("Location", "")
if loc and "127.0.0.1" not in loc and "localhost" not in loc:
return loc
except Exception:
pass
return url
def titles_match(a: str, b: str) -> bool:
"""Fuzzy title comparison: first 20 chars of one appears in the other."""
a, b = a.lower().strip(), b.lower().strip()
if not a or not b:
return True
prefix_len = min(20, min(len(a), len(b)))
return a[:prefix_len] in b or b[:prefix_len] in a
def check_video(video: dict, channel_id: str, results: Results, run_ytdlp: bool):
vid_id = video.get("id", "?")
label = f"id={vid_id}"
vurl = video.get("url", "")
thumb = video.get("thumb", "")
formats: list[dict] = video.get("formats") or []
# video.url must not point to the hottub server itself
if not vurl:
results.err(channel_id, f"{label}: missing url")
elif "127.0.0.1" in vurl or "localhost" in vurl:
results.err(channel_id, f"{label}: url points to hottub server: {vurl}")
else:
ok, code = http_ok(vurl)
if not ok:
if _is_cf_protected(vurl):
results.warn(channel_id, f"{label}: url unreachable HTTP={code} (CF-protected host, expected): {vurl}")
else:
results.err(channel_id, f"{label}: url unreachable HTTP={code}: {vurl}")
else:
results.info(channel_id, f"{label}: url OK (HTTP {code})")
# video.thumb
if thumb:
ok, code = http_ok(thumb)
if not ok:
results.err(channel_id, f"{label}: thumb unreachable HTTP={code}: {thumb}")
else:
results.info(channel_id, f"{label}: thumb OK (HTTP {code})")
else:
results.warn(channel_id, f"{label}: no thumb")
# video.formats[].url with declared http_headers
for j, fmt in enumerate(formats):
furl = fmt.get("url", "")
fheaders: dict[str, str] = fmt.get("http_headers") or {}
if not furl:
results.err(channel_id, f"{label} format[{j}]: missing url")
continue
ok, code = http_ok(furl, headers=fheaders)
if not ok:
results.err(
channel_id,
f"{label} format[{j}]: unreachable HTTP={code}"
+ (f" (headers={list(fheaders.keys())})" if fheaders else "")
+ f": {furl}",
)
else:
results.info(channel_id, f"{label} format[{j}]: OK (HTTP {code})")
if not run_ytdlp:
return
# yt-dlp info extraction on video.url (page URLs only, skipped when formats are provided)
ytdlp_url = vurl if vurl and "127.0.0.1" not in vurl and "localhost" not in vurl else ""
if not formats and ytdlp_url and is_page_url(ytdlp_url):
results.info(channel_id, f"{label}: yt-dlp extract {ytdlp_url}")
yt, stderr = ytdlp_extract(ytdlp_url)
if yt is None:
if _is_cf_protected(ytdlp_url):
results.warn(
channel_id,
f"{label}: yt-dlp failed for {ytdlp_url} (CF-protected host, expected)"
+ (f": {stderr[:200]}" if stderr else ""),
)
else:
results.err(
channel_id,
f"{label}: yt-dlp failed for {ytdlp_url}"
+ (f": {stderr[:300]}" if stderr else ""),
)
else:
yt_title = (yt.get("title") or "").strip()
api_title = (video.get("title") or "").strip()
yt_dur = yt.get("duration")
api_dur = video.get("duration") or 0
if yt_dur is not None and api_dur and abs(float(yt_dur) - float(api_dur)) > 10:
results.warn(
channel_id,
f"{label}: duration mismatch — yt-dlp={yt_dur}s api={api_dur}s",
)
yt_fmts = yt.get("formats") or []
yt_direct = yt.get("url")
if not yt_fmts and not yt_direct:
results.err(
channel_id,
f"{label}: yt-dlp returned no formats/url for {ytdlp_url}",
)
else:
results.info(
channel_id,
f"{label}: yt-dlp OK — formats={len(yt_fmts)}"
+ (f" title='{yt_title[:50]}'" if yt_title else ""),
)
# Compare each API format against the matching yt-dlp format by URL.
yt_fmts_by_url = {f.get("url", ""): f for f in yt_fmts}
# Also handle the single-format case where yt-dlp puts url at top level.
if yt_direct and not yt_fmts:
yt_fmts_by_url[yt_direct] = yt
for j, api_fmt in enumerate(formats):
furl = api_fmt.get("url", "")
if not furl:
continue
yt_fmt = yt_fmts_by_url.get(furl)
if yt_fmt is None:
results.warn(
channel_id,
f"{label} format[{j}]: url not found in yt-dlp formats — {furl}",
)
else:
compare_format_fields(api_fmt, yt_fmt, channel_id, f"{label} format[{j}]", results)
# yt-dlp on format URLs that are not direct media files
for j, fmt in enumerate(formats):
furl = fmt.get("url", "")
if not furl or is_media_file_url(furl):
continue
fheaders: dict[str, str] = fmt.get("http_headers") or {}
extra_args: list[str] = []
for hk, hv in fheaders.items():
extra_args += ["--add-header", f"{hk}:{hv}"]
results.info(channel_id, f"{label} format[{j}]: yt-dlp extract {furl}")
yt, stderr = ytdlp_extract(furl, extra_args=extra_args)
if yt is None:
results.err(
channel_id,
f"{label} format[{j}]: yt-dlp failed for {furl}"
+ (f": {stderr[:200]}" if stderr else ""),
)
else:
yt_fmts = yt.get("formats") or []
yt_direct = yt.get("url")
if not yt_fmts and not yt_direct:
results.err(
channel_id,
f"{label} format[{j}]: yt-dlp returned no formats/url for {furl}",
)
else:
results.info(
channel_id,
f"{label} format[{j}]: yt-dlp OK — formats={len(yt_fmts)}",
)
def check_channel(channel_id: str, channel_name: str, base: str, results: Results, run_ytdlp: bool):
results.info(channel_id, f"--- start '{channel_name}' ---")
try:
resp = requests.post(
f"{base}/api/videos",
json={
"channel": channel_id,
"sort": "date",
"page": "1",
"perPage": str(VIDEOS_TO_SAMPLE),
},
timeout=30,
)
except Exception as e:
results.err(channel_id, f"videos request failed: {e}")
return
if resp.status_code != 200:
results.err(channel_id, f"videos endpoint returned HTTP {resp.status_code}")
return
try:
data = resp.json()
except Exception as e:
results.err(channel_id, f"could not parse videos response: {e}")
return
items: list[dict] = data.get("items") or []
if not items:
results.err(channel_id, "no items returned by /api/videos")
return
results.info(channel_id, f"{len(items)} item(s) returned")
for video in items[:VIDEOS_TO_SAMPLE]:
check_video(video, channel_id, results, run_ytdlp)
def main():
parser = argparse.ArgumentParser(description="Hottub channel health checker")
parser.add_argument("channel", nargs="?", help="single channel id to test")
parser.add_argument("--url", default=DEFAULT_BASE, metavar="BASE_URL",
help=f"server base URL (default: {DEFAULT_BASE})")
parser.add_argument("--no-ytdlp", action="store_true",
help="skip yt-dlp extraction checks")
parser.add_argument("--workers", type=int, default=4,
help="parallel channel workers (default: 4)")
parser.add_argument("-v", "--verbose", action="store_true",
help="show INFO log lines")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.WARNING,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%H:%M:%S",
)
# Always print ERRORs and WARNINGs; INFO only in verbose mode
if not args.verbose:
logging.getLogger("check").setLevel(logging.WARNING)
else:
logging.getLogger("check").setLevel(logging.INFO)
base = args.url.rstrip("/")
run_ytdlp = not args.no_ytdlp
results = Results()
if args.channel:
# Single channel: skip status, go directly to the channel check
channels = [{"id": args.channel, "name": args.channel}]
else:
# No channel specified: check status first, then all channels
print(f"checking {base}/api/status ...")
try:
resp = requests.get(f"{base}/api/status", timeout=15)
except Exception as e:
print(f"ERROR: /api/status unreachable: {e}")
sys.exit(1)
if resp.status_code != 200:
print(f"ERROR: /api/status returned HTTP {resp.status_code}")
sys.exit(1)
try:
status_data = resp.json()
except Exception as e:
print(f"ERROR: could not parse /api/status response: {e}")
sys.exit(1)
channels: list[dict] = status_data.get("channels") or []
if not channels:
print("ERROR: no channels in /api/status response")
sys.exit(1)
print(f"status OK — {len(channels)} channels")
# --- per-channel checks ---
if len(channels) == 1 or args.workers <= 1:
for ch in channels:
check_channel(ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp)
else:
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futs = {
pool.submit(check_channel, ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp): ch
for ch in channels
}
for fut in as_completed(futs):
try:
fut.result()
except Exception as e:
ch = futs[fut]
results.err(ch["id"], f"unexpected exception: {e}")
# --- summary ---
print()
total = len(results.errors) + len(results.warnings)
if not results.errors and not results.warnings:
print(f"All checks passed ({len(channels)} channel(s) tested).")
sys.exit(0)
if results.errors:
print(f"=== {len(results.errors)} error(s) ===")
for e in results.errors:
print(f" {e}")
if results.warnings:
print(f"=== {len(results.warnings)} warning(s) ===")
for w in results.warnings:
print(f" {w}")
sys.exit(1 if results.errors else 0)
if __name__ == "__main__":
main()