521 lines
18 KiB
Python
521 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hottub channel health checker.
|
|
|
|
Usage:
|
|
python check.py [channel_id]
|
|
python check.py --url http://127.0.0.1:18080 spankbang
|
|
python check.py --no-ytdlp # skip yt-dlp extraction
|
|
python check.py --workers 8 # parallel channel workers
|
|
python check.py -v # verbose
|
|
|
|
Checks per channel:
|
|
- /api/videos returns items
|
|
- video.url resolves (follows localhost proxy redirects)
|
|
- video.thumb resolves
|
|
- video.formats[].url resolves with the declared http_headers
|
|
- yt-dlp -J on video.url (page URLs only): title/duration must match
|
|
- yt-dlp -J on each format.url that is not a direct media file
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
import argparse
|
|
import logging
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Optional
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
sys.exit("requests is required: pip install requests")
|
|
|
|
DEFAULT_BASE = "http://127.0.0.1:18080"
|
|
VIDEOS_TO_SAMPLE = 3
|
|
YTDLP_TIMEOUT = 60
|
|
HTTP_TIMEOUT = 15
|
|
|
|
log = logging.getLogger("check")
|
|
|
|
|
|
class Results:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self.errors: list[str] = []
|
|
self.warnings: list[str] = []
|
|
|
|
def err(self, channel: str, msg: str):
|
|
entry = f"[{channel}] {msg}"
|
|
log.error("[%s] %s", channel, msg)
|
|
with self._lock:
|
|
self.errors.append(entry)
|
|
|
|
def warn(self, channel: str, msg: str):
|
|
entry = f"[{channel}] {msg}"
|
|
log.warning("[%s] %s", channel, msg)
|
|
with self._lock:
|
|
self.warnings.append(entry)
|
|
|
|
def info(self, channel: str, msg: str):
|
|
log.info("[%s] %s", channel, msg)
|
|
|
|
|
|
_BROWSER_UA = "Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0"
|
|
|
|
# Domains known to be Cloudflare-protected and return 403/connection-refused to direct
|
|
# HTTP checks. URL reachability failures for these hosts are downgraded to warnings.
|
|
_CF_PROTECTED_HOSTS = {
|
|
"www.camsoda.com",
|
|
"camsoda.com",
|
|
}
|
|
|
|
|
|
def _is_cf_protected(url: str) -> bool:
|
|
"""Return True if the URL's host is known to be CF-protected."""
|
|
try:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).hostname or ""
|
|
return host in _CF_PROTECTED_HOSTS
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def http_ok(url: str, headers: dict | None = None) -> tuple[bool, int]:
|
|
"""Return (ok, http_status). Tries HEAD then ranged GET on 405."""
|
|
h = {"User-Agent": _BROWSER_UA, **(headers or {})}
|
|
try:
|
|
r = requests.head(url, headers=h, timeout=HTTP_TIMEOUT, allow_redirects=True)
|
|
if r.status_code in (200, 206):
|
|
return True, r.status_code
|
|
if r.status_code == 405:
|
|
r2 = requests.get(
|
|
url,
|
|
headers={**h, "Range": "bytes=0-1023"},
|
|
timeout=HTTP_TIMEOUT,
|
|
stream=True,
|
|
)
|
|
r2.close()
|
|
if r2.status_code in (200, 206):
|
|
return True, r2.status_code
|
|
return False, r2.status_code
|
|
return False, r.status_code
|
|
except requests.exceptions.Timeout:
|
|
return False, -1 # timeout
|
|
except requests.exceptions.ConnectionError:
|
|
return False, -2 # connection refused / DNS
|
|
except Exception as e:
|
|
log.debug("http_ok exception for %s: %s", url, e)
|
|
return False, -3
|
|
|
|
|
|
def ytdlp_extract(url: str, extra_args: list[str] | None = None) -> tuple[Optional[dict], str]:
|
|
"""Run yt-dlp -j and return (parsed_info_or_None, stderr_text).
|
|
|
|
-j outputs one JSON object per line; we take the first non-empty line.
|
|
"""
|
|
cmd = (
|
|
["yt-dlp", "-j", "--no-warnings", "--socket-timeout", "20"]
|
|
+ (extra_args or [])
|
|
+ [url]
|
|
)
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, timeout=YTDLP_TIMEOUT)
|
|
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
|
|
if proc.returncode != 0:
|
|
return None, stderr
|
|
first_line = next(
|
|
(l for l in proc.stdout.splitlines() if l.strip()), b""
|
|
)
|
|
if not first_line:
|
|
return None, "yt-dlp produced no output"
|
|
info = json.loads(first_line)
|
|
return info, stderr
|
|
except subprocess.TimeoutExpired:
|
|
return None, "yt-dlp timed out"
|
|
except json.JSONDecodeError as e:
|
|
return None, f"invalid JSON from yt-dlp: {e}"
|
|
except FileNotFoundError:
|
|
return None, "yt-dlp not found in PATH"
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
# Headers yt-dlp always injects regardless of site — not meaningful to compare.
|
|
_YTDLP_BUILTIN_HEADERS = frozenset(
|
|
k.lower() for k in (
|
|
"User-Agent", "Accept", "Accept-Language", "Accept-Encoding",
|
|
"Accept-Charset", "Connection", "Sec-Fetch-Mode",
|
|
)
|
|
)
|
|
|
|
|
|
def compare_format_fields(
|
|
api_fmt: dict,
|
|
yt_fmt: dict,
|
|
channel: str,
|
|
label: str,
|
|
results: Results,
|
|
):
|
|
"""Flag differences in ext, protocol, video_ext and http_headers."""
|
|
for field in ("ext", "protocol", "video_ext"):
|
|
api_val = api_fmt.get(field)
|
|
yt_val = yt_fmt.get(field)
|
|
if api_val and yt_val and api_val != yt_val:
|
|
results.warn(
|
|
channel,
|
|
f"{label}: {field} mismatch — api='{api_val}' yt-dlp='{yt_val}'",
|
|
)
|
|
|
|
api_headers = {k.lower(): v for k, v in (api_fmt.get("http_headers") or {}).items()}
|
|
yt_headers = {k.lower(): v for k, v in (yt_fmt.get("http_headers") or {}).items()}
|
|
|
|
# Headers declared by the API that yt-dlp also knows about — values must match.
|
|
for key, api_val in api_headers.items():
|
|
if key in _YTDLP_BUILTIN_HEADERS:
|
|
continue
|
|
if key not in yt_headers:
|
|
results.warn(channel, f"{label}: http_header '{key}' in api but absent in yt-dlp")
|
|
elif yt_headers[key] != api_val:
|
|
results.warn(
|
|
channel,
|
|
f"{label}: http_header '{key}' mismatch — api='{api_val}' yt-dlp='{yt_headers[key]}'",
|
|
)
|
|
|
|
# Non-builtin headers yt-dlp requires that the API does not declare.
|
|
for key, yt_val in yt_headers.items():
|
|
if key in _YTDLP_BUILTIN_HEADERS or key in api_headers:
|
|
continue
|
|
results.warn(channel, f"{label}: http_header '{key}' required by yt-dlp but not declared in api")
|
|
|
|
|
|
def is_page_url(url: str) -> bool:
|
|
"""True if the URL looks like a browseable page, not a media file or local proxy."""
|
|
if not url.startswith(("http://", "https://")):
|
|
return False
|
|
if "127.0.0.1" in url or "localhost" in url:
|
|
return False
|
|
path = url.lower().split("?")[0]
|
|
for ext in (".mp4", ".m3u8", ".ts", ".webm", ".mkv", ".flv", ".avi", ".mov"):
|
|
if path.endswith(ext):
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_media_file_url(url: str) -> bool:
|
|
"""True if the URL directly points to a media file (not a page, not HLS)."""
|
|
path = url.lower().split("?")[0]
|
|
for ext in (".mp4", ".webm", ".mkv", ".flv", ".avi", ".mov", ".ts"):
|
|
if path.endswith(ext):
|
|
return True
|
|
return False
|
|
|
|
|
|
def follow_proxy_redirect(url: str) -> str:
|
|
"""If url is a localhost proxy URL, follow one redirect to get the real URL."""
|
|
if "127.0.0.1" not in url and "localhost" not in url:
|
|
return url
|
|
try:
|
|
r = requests.head(url, timeout=HTTP_TIMEOUT, allow_redirects=False)
|
|
if r.status_code in (301, 302, 303, 307, 308):
|
|
loc = r.headers.get("Location", "")
|
|
if loc and "127.0.0.1" not in loc and "localhost" not in loc:
|
|
return loc
|
|
except Exception:
|
|
pass
|
|
return url
|
|
|
|
|
|
def titles_match(a: str, b: str) -> bool:
|
|
"""Fuzzy title comparison: first 20 chars of one appears in the other."""
|
|
a, b = a.lower().strip(), b.lower().strip()
|
|
if not a or not b:
|
|
return True
|
|
prefix_len = min(20, min(len(a), len(b)))
|
|
return a[:prefix_len] in b or b[:prefix_len] in a
|
|
|
|
|
|
def check_video(video: dict, channel_id: str, results: Results, run_ytdlp: bool):
|
|
vid_id = video.get("id", "?")
|
|
label = f"id={vid_id}"
|
|
vurl = video.get("url", "")
|
|
thumb = video.get("thumb", "")
|
|
formats: list[dict] = video.get("formats") or []
|
|
|
|
# video.url must not point to the hottub server itself
|
|
if not vurl:
|
|
results.err(channel_id, f"{label}: missing url")
|
|
elif "127.0.0.1" in vurl or "localhost" in vurl:
|
|
results.err(channel_id, f"{label}: url points to hottub server: {vurl}")
|
|
else:
|
|
ok, code = http_ok(vurl)
|
|
if not ok:
|
|
if _is_cf_protected(vurl):
|
|
results.warn(channel_id, f"{label}: url unreachable HTTP={code} (CF-protected host, expected): {vurl}")
|
|
else:
|
|
results.err(channel_id, f"{label}: url unreachable HTTP={code}: {vurl}")
|
|
else:
|
|
results.info(channel_id, f"{label}: url OK (HTTP {code})")
|
|
|
|
# video.thumb
|
|
if thumb:
|
|
ok, code = http_ok(thumb)
|
|
if not ok:
|
|
results.err(channel_id, f"{label}: thumb unreachable HTTP={code}: {thumb}")
|
|
else:
|
|
results.info(channel_id, f"{label}: thumb OK (HTTP {code})")
|
|
else:
|
|
results.warn(channel_id, f"{label}: no thumb")
|
|
|
|
# video.formats[].url with declared http_headers
|
|
for j, fmt in enumerate(formats):
|
|
furl = fmt.get("url", "")
|
|
fheaders: dict[str, str] = fmt.get("http_headers") or {}
|
|
if not furl:
|
|
results.err(channel_id, f"{label} format[{j}]: missing url")
|
|
continue
|
|
ok, code = http_ok(furl, headers=fheaders)
|
|
if not ok:
|
|
results.err(
|
|
channel_id,
|
|
f"{label} format[{j}]: unreachable HTTP={code}"
|
|
+ (f" (headers={list(fheaders.keys())})" if fheaders else "")
|
|
+ f": {furl}",
|
|
)
|
|
else:
|
|
results.info(channel_id, f"{label} format[{j}]: OK (HTTP {code})")
|
|
|
|
if not run_ytdlp:
|
|
return
|
|
|
|
# yt-dlp info extraction on video.url (page URLs only, skipped when formats are provided)
|
|
ytdlp_url = vurl if vurl and "127.0.0.1" not in vurl and "localhost" not in vurl else ""
|
|
if not formats and ytdlp_url and is_page_url(ytdlp_url):
|
|
results.info(channel_id, f"{label}: yt-dlp extract {ytdlp_url}")
|
|
yt, stderr = ytdlp_extract(ytdlp_url)
|
|
if yt is None:
|
|
if _is_cf_protected(ytdlp_url):
|
|
results.warn(
|
|
channel_id,
|
|
f"{label}: yt-dlp failed for {ytdlp_url} (CF-protected host, expected)"
|
|
+ (f": {stderr[:200]}" if stderr else ""),
|
|
)
|
|
else:
|
|
results.err(
|
|
channel_id,
|
|
f"{label}: yt-dlp failed for {ytdlp_url}"
|
|
+ (f": {stderr[:300]}" if stderr else ""),
|
|
)
|
|
else:
|
|
yt_title = (yt.get("title") or "").strip()
|
|
api_title = (video.get("title") or "").strip()
|
|
|
|
yt_dur = yt.get("duration")
|
|
api_dur = video.get("duration") or 0
|
|
if yt_dur is not None and api_dur and abs(float(yt_dur) - float(api_dur)) > 10:
|
|
results.warn(
|
|
channel_id,
|
|
f"{label}: duration mismatch — yt-dlp={yt_dur}s api={api_dur}s",
|
|
)
|
|
|
|
yt_fmts = yt.get("formats") or []
|
|
yt_direct = yt.get("url")
|
|
if not yt_fmts and not yt_direct:
|
|
results.err(
|
|
channel_id,
|
|
f"{label}: yt-dlp returned no formats/url for {ytdlp_url}",
|
|
)
|
|
else:
|
|
results.info(
|
|
channel_id,
|
|
f"{label}: yt-dlp OK — formats={len(yt_fmts)}"
|
|
+ (f" title='{yt_title[:50]}'" if yt_title else ""),
|
|
)
|
|
|
|
# Compare each API format against the matching yt-dlp format by URL.
|
|
yt_fmts_by_url = {f.get("url", ""): f for f in yt_fmts}
|
|
# Also handle the single-format case where yt-dlp puts url at top level.
|
|
if yt_direct and not yt_fmts:
|
|
yt_fmts_by_url[yt_direct] = yt
|
|
for j, api_fmt in enumerate(formats):
|
|
furl = api_fmt.get("url", "")
|
|
if not furl:
|
|
continue
|
|
yt_fmt = yt_fmts_by_url.get(furl)
|
|
if yt_fmt is None:
|
|
results.warn(
|
|
channel_id,
|
|
f"{label} format[{j}]: url not found in yt-dlp formats — {furl}",
|
|
)
|
|
else:
|
|
compare_format_fields(api_fmt, yt_fmt, channel_id, f"{label} format[{j}]", results)
|
|
|
|
# yt-dlp on format URLs that are not direct media files
|
|
for j, fmt in enumerate(formats):
|
|
furl = fmt.get("url", "")
|
|
if not furl or is_media_file_url(furl):
|
|
continue
|
|
fheaders: dict[str, str] = fmt.get("http_headers") or {}
|
|
extra_args: list[str] = []
|
|
for hk, hv in fheaders.items():
|
|
extra_args += ["--add-header", f"{hk}:{hv}"]
|
|
results.info(channel_id, f"{label} format[{j}]: yt-dlp extract {furl}")
|
|
yt, stderr = ytdlp_extract(furl, extra_args=extra_args)
|
|
if yt is None:
|
|
results.err(
|
|
channel_id,
|
|
f"{label} format[{j}]: yt-dlp failed for {furl}"
|
|
+ (f": {stderr[:200]}" if stderr else ""),
|
|
)
|
|
else:
|
|
yt_fmts = yt.get("formats") or []
|
|
yt_direct = yt.get("url")
|
|
if not yt_fmts and not yt_direct:
|
|
results.err(
|
|
channel_id,
|
|
f"{label} format[{j}]: yt-dlp returned no formats/url for {furl}",
|
|
)
|
|
else:
|
|
results.info(
|
|
channel_id,
|
|
f"{label} format[{j}]: yt-dlp OK — formats={len(yt_fmts)}",
|
|
)
|
|
|
|
|
|
def check_channel(channel_id: str, channel_name: str, base: str, results: Results, run_ytdlp: bool):
|
|
results.info(channel_id, f"--- start '{channel_name}' ---")
|
|
|
|
try:
|
|
resp = requests.post(
|
|
f"{base}/api/videos",
|
|
json={
|
|
"channel": channel_id,
|
|
"sort": "date",
|
|
"page": "1",
|
|
"perPage": str(VIDEOS_TO_SAMPLE),
|
|
},
|
|
timeout=30,
|
|
)
|
|
except Exception as e:
|
|
results.err(channel_id, f"videos request failed: {e}")
|
|
return
|
|
|
|
if resp.status_code != 200:
|
|
results.err(channel_id, f"videos endpoint returned HTTP {resp.status_code}")
|
|
return
|
|
|
|
try:
|
|
data = resp.json()
|
|
except Exception as e:
|
|
results.err(channel_id, f"could not parse videos response: {e}")
|
|
return
|
|
|
|
items: list[dict] = data.get("items") or []
|
|
if not items:
|
|
results.err(channel_id, "no items returned by /api/videos")
|
|
return
|
|
|
|
results.info(channel_id, f"{len(items)} item(s) returned")
|
|
for video in items[:VIDEOS_TO_SAMPLE]:
|
|
check_video(video, channel_id, results, run_ytdlp)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Hottub channel health checker")
|
|
parser.add_argument("channel", nargs="?", help="single channel id to test")
|
|
parser.add_argument("--url", default=DEFAULT_BASE, metavar="BASE_URL",
|
|
help=f"server base URL (default: {DEFAULT_BASE})")
|
|
parser.add_argument("--no-ytdlp", action="store_true",
|
|
help="skip yt-dlp extraction checks")
|
|
parser.add_argument("--workers", type=int, default=4,
|
|
help="parallel channel workers (default: 4)")
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="show INFO log lines")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.WARNING,
|
|
format="%(asctime)s %(levelname)-7s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
# Always print ERRORs and WARNINGs; INFO only in verbose mode
|
|
if not args.verbose:
|
|
logging.getLogger("check").setLevel(logging.WARNING)
|
|
else:
|
|
logging.getLogger("check").setLevel(logging.INFO)
|
|
|
|
base = args.url.rstrip("/")
|
|
run_ytdlp = not args.no_ytdlp
|
|
|
|
results = Results()
|
|
|
|
if args.channel:
|
|
# Single channel: skip status, go directly to the channel check
|
|
channels = [{"id": args.channel, "name": args.channel}]
|
|
else:
|
|
# No channel specified: check status first, then all channels
|
|
print(f"checking {base}/api/status ...")
|
|
try:
|
|
resp = requests.get(f"{base}/api/status", timeout=15)
|
|
except Exception as e:
|
|
print(f"ERROR: /api/status unreachable: {e}")
|
|
sys.exit(1)
|
|
|
|
if resp.status_code != 200:
|
|
print(f"ERROR: /api/status returned HTTP {resp.status_code}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
status_data = resp.json()
|
|
except Exception as e:
|
|
print(f"ERROR: could not parse /api/status response: {e}")
|
|
sys.exit(1)
|
|
|
|
channels: list[dict] = status_data.get("channels") or []
|
|
if not channels:
|
|
print("ERROR: no channels in /api/status response")
|
|
sys.exit(1)
|
|
|
|
print(f"status OK — {len(channels)} channels")
|
|
|
|
# --- per-channel checks ---
|
|
if len(channels) == 1 or args.workers <= 1:
|
|
for ch in channels:
|
|
check_channel(ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp)
|
|
else:
|
|
with ThreadPoolExecutor(max_workers=args.workers) as pool:
|
|
futs = {
|
|
pool.submit(check_channel, ch["id"], ch.get("name", ch["id"]), base, results, run_ytdlp): ch
|
|
for ch in channels
|
|
}
|
|
for fut in as_completed(futs):
|
|
try:
|
|
fut.result()
|
|
except Exception as e:
|
|
ch = futs[fut]
|
|
results.err(ch["id"], f"unexpected exception: {e}")
|
|
|
|
# --- summary ---
|
|
print()
|
|
total = len(results.errors) + len(results.warnings)
|
|
if not results.errors and not results.warnings:
|
|
print(f"All checks passed ({len(channels)} channel(s) tested).")
|
|
sys.exit(0)
|
|
|
|
if results.errors:
|
|
print(f"=== {len(results.errors)} error(s) ===")
|
|
for e in results.errors:
|
|
print(f" {e}")
|
|
|
|
if results.warnings:
|
|
print(f"=== {len(results.warnings)} warning(s) ===")
|
|
for w in results.warnings:
|
|
print(f" {w}")
|
|
|
|
sys.exit(1 if results.errors else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|