runtime error handling

This commit is contained in:
Simon
2026-03-29 16:24:49 +00:00
parent 99fe4c947c
commit 243d19cec0
4 changed files with 653 additions and 6 deletions

View File

@@ -65,6 +65,7 @@ async fn main() -> std::io::Result<()> {
.max_size(100_000)
.to_owned();
crate::flow_debug!("video cache initialized max_size=100000");
let _ = providers::configure_runtime_validation(pool.clone(), cache.clone(), requester.clone());
thread::spawn(move || {
crate::flow_debug!("provider init thread spawned");

View File

@@ -1,17 +1,19 @@
use async_trait::async_trait;
use dashmap::{DashMap, DashSet};
use futures::FutureExt;
use once_cell::sync::Lazy;
use rustc_hash::FxHashMap as HashMap;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use crate::{
DbPool,
api::ClientVersion,
status::{Channel, ChannelGroup, ChannelView, Status, StatusResponse},
status::{Channel, ChannelGroup, ChannelView, FilterOption, Status, StatusResponse},
util::{cache::VideoCache, discord::send_discord_error_report, requester::Requester},
videos::{ServerOptions, VideoItem},
videos::{FlexibleNumber, ServerOptions, VideoItem, VideosRequest},
};
include!(concat!(env!("OUT_DIR"), "/provider_selection.rs"));
@@ -32,6 +34,319 @@ pub static ALL_PROVIDERS: Lazy<HashMap<&'static str, DynProvider>> = Lazy::new(|
m
});
const CHANNEL_STATUS_ERROR: &str = "error";
const VALIDATION_RESULTS_REQUIRED: usize = 5;
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(60);
#[derive(Clone)]
struct ProviderValidationContext {
pool: DbPool,
cache: VideoCache,
requester: Requester,
}
static PROVIDER_VALIDATION_CONTEXT: OnceLock<ProviderValidationContext> = OnceLock::new();
static PROVIDER_RUNTIME_STATUS: Lazy<DashMap<String, String>> = Lazy::new(DashMap::new);
static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::new);
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
fn validation_client_version() -> ClientVersion {
ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string())
}
fn preferred_option_value(options: &[FilterOption]) -> Option<String> {
options
.iter()
.find(|option| option.id == "all")
.or_else(|| options.first())
.map(|option| option.id.clone())
}
fn channel_option_value(channel: &Channel, option_id: &str) -> Option<String> {
channel
.options
.iter()
.find(|option| option.id == option_id)
.and_then(|option| preferred_option_value(&option.options))
}
fn validation_request_for_channel(channel: &Channel) -> VideosRequest {
VideosRequest {
clientHash: None,
blockedKeywords: None,
countryCode: None,
clientVersion: Some("2.1.4-22b".to_string()),
timestamp: None,
blockedUploaders: None,
anonId: None,
debugTools: None,
versionInstallDate: None,
languageCode: Some("en".to_string()),
appInstallDate: None,
server: None,
sexuality: channel_option_value(channel, "sexuality"),
channel: Some(channel.id.clone()),
sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())),
query: None,
page: Some(FlexibleNumber::Int(1)),
perPage: Some(FlexibleNumber::Int(VALIDATION_RESULTS_REQUIRED as u64)),
featured: channel_option_value(channel, "featured"),
category: channel_option_value(channel, "category"),
sites: channel_option_value(channel, "sites"),
all_provider_sites: None,
filter: channel_option_value(channel, "filter"),
language: channel_option_value(channel, "language"),
networks: channel_option_value(channel, "networks"),
stars: channel_option_value(channel, "stars"),
categories: channel_option_value(channel, "categories"),
duration: channel_option_value(channel, "duration"),
}
}
fn media_target(item: &VideoItem) -> (String, Vec<(String, String)>) {
if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) {
return (format.url.clone(), format.http_headers_pairs());
}
(item.url.clone(), Vec::new())
}
fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
let content_type = content_type.to_ascii_lowercase();
if content_type.starts_with("video/")
|| content_type.starts_with("audio/")
|| content_type.contains("mpegurl")
|| content_type.contains("mp2t")
|| content_type.contains("octet-stream")
{
return true;
}
body.starts_with(b"#EXTM3U")
|| body.windows(4).any(|window| window == b"ftyp")
|| body.windows(4).any(|window| window == b"mdat")
}
async fn validate_media_response(
provider_id: &str,
item_index: usize,
url: &str,
mut headers: Vec<(String, String)>,
requester: Requester,
) -> Result<(), String> {
if !headers
.iter()
.any(|(key, _)| key.eq_ignore_ascii_case("range"))
{
headers.push(("Range".to_string(), "bytes=0-2047".to_string()));
}
let mut requester = requester;
let response = requester
.get_raw_with_headers(url, headers)
.await
.map_err(|err| {
format!(
"{provider_id} item {} request failed for {url}: {err}",
item_index + 1
)
})?;
let status = response.status();
if !status.is_success() {
return Err(format!(
"{provider_id} item {} request returned {status} for {url}",
item_index + 1
));
}
let content_type = response
.headers()
.get("content-type")
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_string();
let body = response.bytes().await.map_err(|err| {
format!(
"{provider_id} item {} body read failed for {url}: {err}",
item_index + 1
)
})?;
if body.is_empty() {
return Err(format!(
"{provider_id} item {} returned an empty body for {url}",
item_index + 1
));
}
if !looks_like_media(&content_type, body.as_ref()) {
return Err(format!(
"{provider_id} item {} did not look like media for {url}; content-type={content_type:?}",
item_index + 1
));
}
Ok(())
}
async fn run_provider_validation(provider_id: &str) -> Result<(), String> {
let Some(context) = PROVIDER_VALIDATION_CONTEXT.get().cloned() else {
return Err("provider validation context not configured".to_string());
};
let provider = ALL_PROVIDERS
.get(provider_id)
.cloned()
.ok_or_else(|| format!("unknown provider: {provider_id}"))?;
let channel = provider
.get_channel(validation_client_version())
.ok_or_else(|| format!("channel unavailable for provider: {provider_id}"))?;
let request = validation_request_for_channel(&channel);
let sort = request.sort.clone().unwrap_or_else(|| "date".to_string());
let page = request
.page
.as_ref()
.and_then(|value| value.to_u8())
.unwrap_or(1);
let per_page = request
.perPage
.as_ref()
.and_then(|value| value.to_u8())
.unwrap_or(VALIDATION_RESULTS_REQUIRED as u8);
let options = ServerOptions {
featured: request.featured.clone(),
category: request.category.clone(),
sites: request.sites.clone(),
filter: request.filter.clone(),
language: request.language.clone(),
public_url_base: None,
requester: Some(context.requester.clone()),
network: request.networks.clone(),
stars: request.stars.clone(),
categories: request.categories.clone(),
duration: request.duration.clone(),
sort: Some(sort.clone()),
sexuality: request.sexuality.clone(),
};
let items = run_provider_guarded(
provider_id,
"runtime_validation",
provider.get_videos(
context.cache.clone(),
context.pool.clone(),
sort,
request.query.clone(),
page.to_string(),
per_page.to_string(),
options,
),
)
.await;
if items.len() < VALIDATION_RESULTS_REQUIRED {
return Err(format!(
"{provider_id} returned fewer than {VALIDATION_RESULTS_REQUIRED} items: {}",
items.len()
));
}
for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() {
let (url, headers) = media_target(item);
if url.is_empty() {
return Err(format!(
"{provider_id} item {} returned an empty media url",
item_index + 1
));
}
validate_media_response(
provider_id,
item_index,
&url,
headers,
context.requester.clone(),
)
.await?;
}
Ok(())
}
pub fn configure_runtime_validation(
pool: DbPool,
cache: VideoCache,
requester: Requester,
) -> Result<(), &'static str> {
PROVIDER_VALIDATION_CONTEXT
.set(ProviderValidationContext {
pool,
cache,
requester,
})
.map_err(|_| "provider validation context already configured")
}
pub fn current_provider_channel_status(provider_id: &str) -> Option<String> {
PROVIDER_RUNTIME_STATUS
.get(provider_id)
.map(|status| status.value().clone())
}
pub fn schedule_provider_validation(provider_id: &str, context: &str, msg: &str) {
if !ALL_PROVIDERS.contains_key(provider_id) {
return;
}
if PROVIDER_VALIDATION_CONTEXT.get().is_none() {
return;
}
if let Some(last_run) = PROVIDER_VALIDATION_LAST_RUN.get(provider_id) {
if last_run.elapsed() < VALIDATION_COOLDOWN {
crate::flow_debug!(
"provider validation skip cooldown provider={} context={} error={}",
provider_id,
context,
crate::util::flow_debug::preview(msg, 120)
);
return;
}
}
let provider_id = provider_id.to_string();
if !PROVIDER_VALIDATION_INFLIGHT.insert(provider_id.clone()) {
return;
}
PROVIDER_VALIDATION_LAST_RUN.insert(provider_id.clone(), Instant::now());
let _context_name = context.to_string();
let _error_preview = msg.to_string();
tokio::spawn(async move {
crate::flow_debug!(
"provider validation start provider={} context={} error={}",
&provider_id,
&_context_name,
crate::util::flow_debug::preview(&_error_preview, 120)
);
let validation_result = run_provider_validation(&provider_id).await;
match validation_result {
Ok(()) => {
PROVIDER_RUNTIME_STATUS.remove(&provider_id);
}
Err(_validation_error) => {
PROVIDER_RUNTIME_STATUS
.insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string());
crate::flow_debug!(
"provider validation failed provider={} error={}",
&provider_id,
crate::util::flow_debug::preview(&_validation_error, 160)
);
}
}
PROVIDER_VALIDATION_INFLIGHT.remove(&provider_id);
});
}
pub fn init_providers_now() {
// Idempotent & thread-safe: runs the Lazy init exactly once.
crate::flow_debug!(
@@ -99,6 +414,7 @@ where
module_path!(),
)
.await;
schedule_provider_validation(provider_name, context, &panic_msg);
vec![]
}
}
@@ -115,6 +431,7 @@ pub async fn report_provider_error(provider_name: &str, context: &str, msg: &str
module_path!(),
)
.await;
schedule_provider_validation(provider_name, context, msg);
}
pub fn report_provider_error_background(provider_name: &str, context: &str, msg: &str) {
@@ -232,6 +549,7 @@ fn channel_group_order(group_id: &str) -> usize {
pub fn decorate_channel(channel: Channel) -> ChannelView {
let metadata = channel_metadata_for(&channel.id);
let runtime_status = current_provider_channel_status(&channel.id);
let ytdlp_command = match channel.id.as_str() {
"pimpbunny" => Some("yt-dlp --compat-options allow-unsafe-ext".to_string()),
_ => None,
@@ -242,7 +560,7 @@ pub fn decorate_channel(channel: Channel) -> ChannelView {
description: channel.description,
premium: channel.premium,
favicon: channel.favicon,
status: channel.status,
status: runtime_status.unwrap_or(channel.status),
categories: channel.categories,
options: channel.options,
nsfw: channel.nsfw,
@@ -398,7 +716,37 @@ pub trait Provider: Send + Sync {
#[cfg(all(test, not(hottub_single_provider)))]
mod tests {
use super::*;
use crate::status::ChannelOption;
use crate::api::ClientVersion;
use crate::status::{ChannelOption, FilterOption};
use crate::util::cache::VideoCache;
use crate::util::requester::Requester;
use crate::videos::{FlexibleNumber, VideosRequest};
use diesel::{
SqliteConnection,
r2d2::{self, ConnectionManager},
};
use ntex::http::header;
use ntex::web::{self, test};
use serde::Deserialize;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Deserialize)]
struct ApiVideosResponse {
items: Vec<ApiVideoItem>,
}
#[derive(Debug, Deserialize)]
struct ApiVideoItem {
url: String,
formats: Option<Vec<ApiVideoFormat>>,
}
#[derive(Debug, Deserialize)]
struct ApiVideoFormat {
url: String,
http_headers: Option<HashMap<String, String>>,
}
fn base_channel(id: &str) -> Channel {
Channel {
@@ -415,8 +763,172 @@ mod tests {
}
}
fn test_db_pool() -> DbPool {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after unix epoch")
.as_nanos();
let database_url = std::env::temp_dir()
.join(format!("hottub-providers-test-{unique}.sqlite"))
.to_string_lossy()
.into_owned();
let manager = ConnectionManager::<SqliteConnection>::new(database_url);
r2d2::Pool::builder()
.max_size(8)
.build(manager)
.expect("test db pool should build")
}
fn preferred_option_value(options: &[FilterOption]) -> Option<String> {
options
.iter()
.find(|option| option.id == "all")
.or_else(|| options.first())
.map(|option| option.id.clone())
}
fn channel_option_value(channel: &Channel, option_id: &str) -> Option<String> {
channel
.options
.iter()
.find(|option| option.id == option_id)
.and_then(|option| preferred_option_value(&option.options))
}
fn request_for_channel(channel: &Channel) -> VideosRequest {
VideosRequest {
clientHash: None,
blockedKeywords: None,
countryCode: None,
clientVersion: Some("2.1.4-22b".to_string()),
timestamp: None,
blockedUploaders: None,
anonId: None,
debugTools: None,
versionInstallDate: None,
languageCode: Some("en".to_string()),
appInstallDate: None,
server: None,
sexuality: channel_option_value(channel, "sexuality"),
channel: Some(channel.id.clone()),
sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())),
query: None,
page: Some(FlexibleNumber::Int(1)),
perPage: Some(FlexibleNumber::Int(5)),
featured: channel_option_value(channel, "featured"),
category: channel_option_value(channel, "category"),
sites: channel_option_value(channel, "sites"),
all_provider_sites: None,
filter: channel_option_value(channel, "filter"),
language: channel_option_value(channel, "language"),
networks: channel_option_value(channel, "networks"),
stars: channel_option_value(channel, "stars"),
categories: channel_option_value(channel, "categories"),
duration: channel_option_value(channel, "duration"),
}
}
fn skip_reason_for_provider(provider_id: &str) -> Option<&'static str> {
if std::env::var("FLARE_URL").is_ok() {
return None;
}
match provider_id {
"hentaihaven" | "homoxxx" | "okporn" | "okxxx" | "perfectgirls" => {
Some("requires FLARE_URL for live requests in this environment")
}
_ => None,
}
}
fn media_target(item: &ApiVideoItem) -> (String, Vec<(String, String)>) {
if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) {
let headers = format
.http_headers
.clone()
.unwrap_or_default()
.into_iter()
.collect();
return (format.url.clone(), headers);
}
(item.url.clone(), Vec::new())
}
fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
let content_type = content_type.to_ascii_lowercase();
if content_type.starts_with("video/")
|| content_type.starts_with("audio/")
|| content_type.contains("mpegurl")
|| content_type.contains("mp2t")
|| content_type.contains("octet-stream")
{
return true;
}
body.starts_with(b"#EXTM3U")
|| body.windows(4).any(|window| window == b"ftyp")
|| body.windows(4).any(|window| window == b"mdat")
}
async fn assert_media_response(
provider_id: &str,
item_index: usize,
url: &str,
mut headers: Vec<(String, String)>,
) -> Result<(), String> {
if !headers
.iter()
.any(|(key, _)| key.eq_ignore_ascii_case("range"))
{
headers.push(("Range".to_string(), "bytes=0-2047".to_string()));
}
let mut requester = Requester::new();
let response = requester
.get_raw_with_headers(url, headers)
.await
.map_err(|err| format!("{provider_id} item {} request failed for {url}: {err}", item_index + 1))?;
let status = response.status();
if !status.is_success() {
return Err(format!(
"{provider_id} item {} request returned {status} for {url}",
item_index + 1
));
}
let content_type = response
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_string();
let body = response
.bytes()
.await
.map_err(|err| format!("{provider_id} item {} body read failed for {url}: {err}", item_index + 1))?;
if body.is_empty() {
return Err(format!(
"{provider_id} item {} returned an empty body for {url}",
item_index + 1
));
}
if !looks_like_media(&content_type, body.as_ref()) {
return Err(format!(
"{provider_id} item {} did not look like media for {url}; content-type={content_type:?}",
item_index + 1
));
}
Ok(())
}
#[test]
fn decorates_channel_with_group_and_tags() {
PROVIDER_RUNTIME_STATUS.remove("hsex");
let channel = decorate_channel(base_channel("hsex"));
assert_eq!(channel.groupKey.as_deref(), Some("chinese"));
assert_eq!(channel.sortOrder, None);
@@ -435,6 +947,9 @@ mod tests {
#[test]
fn builds_group_index() {
PROVIDER_RUNTIME_STATUS.remove("all");
PROVIDER_RUNTIME_STATUS.remove("hsex");
PROVIDER_RUNTIME_STATUS.remove("missav");
let channels = vec![
decorate_channel(base_channel("all")),
decorate_channel(base_channel("hsex")),
@@ -448,6 +963,7 @@ mod tests {
#[test]
fn decorates_pimpbunny_with_ytdlp_command() {
PROVIDER_RUNTIME_STATUS.remove("pimpbunny");
let channel = decorate_channel(base_channel("pimpbunny"));
assert_eq!(
channel.ytdlpCommand.as_deref(),
@@ -457,6 +973,8 @@ mod tests {
#[test]
fn reflects_updated_group_moves() {
PROVIDER_RUNTIME_STATUS.remove("perverzija");
PROVIDER_RUNTIME_STATUS.remove("rule34gen");
assert_eq!(
decorate_channel(base_channel("perverzija"))
.groupKey
@@ -473,6 +991,10 @@ mod tests {
#[test]
fn status_response_uses_documented_group_keys() {
PROVIDER_RUNTIME_STATUS.remove("all");
PROVIDER_RUNTIME_STATUS.remove("hsex");
PROVIDER_RUNTIME_STATUS.remove("missav");
PROVIDER_RUNTIME_STATUS.remove("pimpbunny");
let mut status = Status::new();
status.channels = vec![
base_channel("missav"),
@@ -515,4 +1037,114 @@ mod tests {
"yt-dlp --compat-options allow-unsafe-ext"
);
}
#[test]
fn runtime_error_status_overrides_channel_status() {
PROVIDER_RUNTIME_STATUS.insert("hsex".to_string(), CHANNEL_STATUS_ERROR.to_string());
let channel = decorate_channel(base_channel("hsex"));
assert_eq!(channel.status, CHANNEL_STATUS_ERROR);
PROVIDER_RUNTIME_STATUS.remove("hsex");
}
#[ntex::test]
#[ignore = "live network sweep across all providers"]
async fn api_videos_returns_working_media_urls_for_all_channels() {
let app = test::init_service(
web::App::new()
.state(test_db_pool())
.state(VideoCache::new().max_size(10_000).to_owned())
.state(Requester::new())
.service(web::scope("/api").configure(crate::api::config)),
)
.await;
let client_version = ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string());
let mut channels = ALL_PROVIDERS
.iter()
.filter(|(provider_id, _)| **provider_id != "all")
.filter_map(|(_, provider)| provider.get_channel(client_version.clone()))
.collect::<Vec<_>>();
channels.sort_by(|a, b| a.id.cmp(&b.id));
let mut failures = Vec::new();
let mut skipped = Vec::new();
for channel in channels {
let payload = request_for_channel(&channel);
let provider_id = channel.id.clone();
if let Some(reason) = skip_reason_for_provider(&provider_id) {
skipped.push(format!("{provider_id}: {reason}"));
continue;
}
let request = test::TestRequest::post()
.uri("/api/videos")
.header(
header::USER_AGENT,
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
)
.set_json(&payload)
.to_request();
let response = test::call_service(&app, request).await;
let status = response.status();
let body = test::read_body(response).await;
if !status.is_success() {
failures.push(format!(
"{provider_id} returned status {status}: {}",
String::from_utf8_lossy(&body)
));
continue;
}
let payload: ApiVideosResponse = match serde_json::from_slice(&body) {
Ok(payload) => payload,
Err(error) => {
failures.push(format!(
"{provider_id} returned invalid JSON: {error}; body={}",
String::from_utf8_lossy(&body)
));
continue;
}
};
if payload.items.len() < 5 {
failures.push(format!(
"{provider_id} returned fewer than 5 items: {}",
payload.items.len()
));
continue;
}
for (item_index, item) in payload.items.iter().take(5).enumerate() {
let (url, headers) = media_target(item);
if url.is_empty() {
failures.push(format!(
"{provider_id} item {} returned an empty media url",
item_index + 1
));
break;
}
if let Err(error) =
assert_media_response(&provider_id, item_index, &url, headers).await
{
failures.push(error);
break;
}
}
}
assert!(
failures.is_empty(),
"provider live sweep failed:\n{}",
failures.join("\n")
);
if !skipped.is_empty() {
eprintln!("skipped providers:\n{}", skipped.join("\n"));
}
}
}

View File

@@ -1218,7 +1218,14 @@ mod tests {
let item = provider.apply_detail_video(item, html).expect("detail");
assert_eq!(item.title, "Real Title");
assert_eq!(item.url, "https://cdn.example/master.m3u8");
assert_eq!(item.formats.as_ref().and_then(|values| values.first()).map(|value| value.format.clone()).as_deref(), Some("m3u8"));
let first_format = item
.formats
.as_ref()
.and_then(|values| values.first())
.expect("expected a parsed format");
let first_format_json =
serde_json::to_value(first_format).expect("format should serialize");
assert_eq!(first_format_json.get("format").and_then(|value| value.as_str()), Some("m3u8"));
assert_eq!(item.duration, 1740);
assert_eq!(item.views, Some(1400));
assert_eq!(item.uploader.as_deref(), Some("Kayley Gunner"));

View File

@@ -431,6 +431,13 @@ impl VideoFormat {
}
self.to_owned()
}
pub fn http_headers_pairs(&self) -> Vec<(String, String)> {
self.http_headers
.clone()
.unwrap_or_default()
.into_iter()
.collect()
}
pub fn format_id(mut self, format_id: String) -> Self {
self.format_id = Some(format_id);
self