diff --git a/src/main.rs b/src/main.rs index 7392e8b..969f8d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> std::io::Result<()> { .max_size(100_000) .to_owned(); crate::flow_debug!("video cache initialized max_size=100000"); + let _ = providers::configure_runtime_validation(pool.clone(), cache.clone(), requester.clone()); thread::spawn(move || { crate::flow_debug!("provider init thread spawned"); diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 56c1dc3..da3ef19 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -1,17 +1,19 @@ use async_trait::async_trait; +use dashmap::{DashMap, DashSet}; use futures::FutureExt; use once_cell::sync::Lazy; use rustc_hash::FxHashMap as HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use std::time::{Duration, Instant}; use crate::{ DbPool, api::ClientVersion, - status::{Channel, ChannelGroup, ChannelView, Status, StatusResponse}, + status::{Channel, ChannelGroup, ChannelView, FilterOption, Status, StatusResponse}, util::{cache::VideoCache, discord::send_discord_error_report, requester::Requester}, - videos::{ServerOptions, VideoItem}, + videos::{FlexibleNumber, ServerOptions, VideoItem, VideosRequest}, }; include!(concat!(env!("OUT_DIR"), "/provider_selection.rs")); @@ -32,6 +34,319 @@ pub static ALL_PROVIDERS: Lazy> = Lazy::new(| m }); +const CHANNEL_STATUS_ERROR: &str = "error"; +const VALIDATION_RESULTS_REQUIRED: usize = 5; +const VALIDATION_COOLDOWN: Duration = Duration::from_secs(60); + +#[derive(Clone)] +struct ProviderValidationContext { + pool: DbPool, + cache: VideoCache, + requester: Requester, +} + +static PROVIDER_VALIDATION_CONTEXT: OnceLock = OnceLock::new(); +static PROVIDER_RUNTIME_STATUS: Lazy> = Lazy::new(DashMap::new); +static PROVIDER_VALIDATION_INFLIGHT: Lazy> = Lazy::new(DashSet::new); +static PROVIDER_VALIDATION_LAST_RUN: Lazy> = Lazy::new(DashMap::new); + +fn validation_client_version() -> ClientVersion { + ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string()) +} + +fn preferred_option_value(options: &[FilterOption]) -> Option { + options + .iter() + .find(|option| option.id == "all") + .or_else(|| options.first()) + .map(|option| option.id.clone()) +} + +fn channel_option_value(channel: &Channel, option_id: &str) -> Option { + channel + .options + .iter() + .find(|option| option.id == option_id) + .and_then(|option| preferred_option_value(&option.options)) +} + +fn validation_request_for_channel(channel: &Channel) -> VideosRequest { + VideosRequest { + clientHash: None, + blockedKeywords: None, + countryCode: None, + clientVersion: Some("2.1.4-22b".to_string()), + timestamp: None, + blockedUploaders: None, + anonId: None, + debugTools: None, + versionInstallDate: None, + languageCode: Some("en".to_string()), + appInstallDate: None, + server: None, + sexuality: channel_option_value(channel, "sexuality"), + channel: Some(channel.id.clone()), + sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())), + query: None, + page: Some(FlexibleNumber::Int(1)), + perPage: Some(FlexibleNumber::Int(VALIDATION_RESULTS_REQUIRED as u64)), + featured: channel_option_value(channel, "featured"), + category: channel_option_value(channel, "category"), + sites: channel_option_value(channel, "sites"), + all_provider_sites: None, + filter: channel_option_value(channel, "filter"), + language: channel_option_value(channel, "language"), + networks: channel_option_value(channel, "networks"), + stars: channel_option_value(channel, "stars"), + categories: channel_option_value(channel, "categories"), + duration: channel_option_value(channel, "duration"), + } +} + +fn media_target(item: &VideoItem) -> (String, Vec<(String, String)>) { + if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) { + return (format.url.clone(), format.http_headers_pairs()); + } + + (item.url.clone(), Vec::new()) +} + +fn looks_like_media(content_type: &str, body: &[u8]) -> bool { + let content_type = content_type.to_ascii_lowercase(); + if content_type.starts_with("video/") + || content_type.starts_with("audio/") + || content_type.contains("mpegurl") + || content_type.contains("mp2t") + || content_type.contains("octet-stream") + { + return true; + } + + body.starts_with(b"#EXTM3U") + || body.windows(4).any(|window| window == b"ftyp") + || body.windows(4).any(|window| window == b"mdat") +} + +async fn validate_media_response( + provider_id: &str, + item_index: usize, + url: &str, + mut headers: Vec<(String, String)>, + requester: Requester, +) -> Result<(), String> { + if !headers + .iter() + .any(|(key, _)| key.eq_ignore_ascii_case("range")) + { + headers.push(("Range".to_string(), "bytes=0-2047".to_string())); + } + + let mut requester = requester; + let response = requester + .get_raw_with_headers(url, headers) + .await + .map_err(|err| { + format!( + "{provider_id} item {} request failed for {url}: {err}", + item_index + 1 + ) + })?; + + let status = response.status(); + if !status.is_success() { + return Err(format!( + "{provider_id} item {} request returned {status} for {url}", + item_index + 1 + )); + } + + let content_type = response + .headers() + .get("content-type") + .and_then(|value| value.to_str().ok()) + .unwrap_or("") + .to_string(); + let body = response.bytes().await.map_err(|err| { + format!( + "{provider_id} item {} body read failed for {url}: {err}", + item_index + 1 + ) + })?; + + if body.is_empty() { + return Err(format!( + "{provider_id} item {} returned an empty body for {url}", + item_index + 1 + )); + } + + if !looks_like_media(&content_type, body.as_ref()) { + return Err(format!( + "{provider_id} item {} did not look like media for {url}; content-type={content_type:?}", + item_index + 1 + )); + } + + Ok(()) +} + +async fn run_provider_validation(provider_id: &str) -> Result<(), String> { + let Some(context) = PROVIDER_VALIDATION_CONTEXT.get().cloned() else { + return Err("provider validation context not configured".to_string()); + }; + let provider = ALL_PROVIDERS + .get(provider_id) + .cloned() + .ok_or_else(|| format!("unknown provider: {provider_id}"))?; + let channel = provider + .get_channel(validation_client_version()) + .ok_or_else(|| format!("channel unavailable for provider: {provider_id}"))?; + let request = validation_request_for_channel(&channel); + let sort = request.sort.clone().unwrap_or_else(|| "date".to_string()); + let page = request + .page + .as_ref() + .and_then(|value| value.to_u8()) + .unwrap_or(1); + let per_page = request + .perPage + .as_ref() + .and_then(|value| value.to_u8()) + .unwrap_or(VALIDATION_RESULTS_REQUIRED as u8); + let options = ServerOptions { + featured: request.featured.clone(), + category: request.category.clone(), + sites: request.sites.clone(), + filter: request.filter.clone(), + language: request.language.clone(), + public_url_base: None, + requester: Some(context.requester.clone()), + network: request.networks.clone(), + stars: request.stars.clone(), + categories: request.categories.clone(), + duration: request.duration.clone(), + sort: Some(sort.clone()), + sexuality: request.sexuality.clone(), + }; + + let items = run_provider_guarded( + provider_id, + "runtime_validation", + provider.get_videos( + context.cache.clone(), + context.pool.clone(), + sort, + request.query.clone(), + page.to_string(), + per_page.to_string(), + options, + ), + ) + .await; + + if items.len() < VALIDATION_RESULTS_REQUIRED { + return Err(format!( + "{provider_id} returned fewer than {VALIDATION_RESULTS_REQUIRED} items: {}", + items.len() + )); + } + + for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() { + let (url, headers) = media_target(item); + if url.is_empty() { + return Err(format!( + "{provider_id} item {} returned an empty media url", + item_index + 1 + )); + } + + validate_media_response( + provider_id, + item_index, + &url, + headers, + context.requester.clone(), + ) + .await?; + } + + Ok(()) +} + +pub fn configure_runtime_validation( + pool: DbPool, + cache: VideoCache, + requester: Requester, +) -> Result<(), &'static str> { + PROVIDER_VALIDATION_CONTEXT + .set(ProviderValidationContext { + pool, + cache, + requester, + }) + .map_err(|_| "provider validation context already configured") +} + +pub fn current_provider_channel_status(provider_id: &str) -> Option { + PROVIDER_RUNTIME_STATUS + .get(provider_id) + .map(|status| status.value().clone()) +} + +pub fn schedule_provider_validation(provider_id: &str, context: &str, msg: &str) { + if !ALL_PROVIDERS.contains_key(provider_id) { + return; + } + if PROVIDER_VALIDATION_CONTEXT.get().is_none() { + return; + } + + if let Some(last_run) = PROVIDER_VALIDATION_LAST_RUN.get(provider_id) { + if last_run.elapsed() < VALIDATION_COOLDOWN { + crate::flow_debug!( + "provider validation skip cooldown provider={} context={} error={}", + provider_id, + context, + crate::util::flow_debug::preview(msg, 120) + ); + return; + } + } + + let provider_id = provider_id.to_string(); + if !PROVIDER_VALIDATION_INFLIGHT.insert(provider_id.clone()) { + return; + } + PROVIDER_VALIDATION_LAST_RUN.insert(provider_id.clone(), Instant::now()); + let _context_name = context.to_string(); + let _error_preview = msg.to_string(); + + tokio::spawn(async move { + crate::flow_debug!( + "provider validation start provider={} context={} error={}", + &provider_id, + &_context_name, + crate::util::flow_debug::preview(&_error_preview, 120) + ); + let validation_result = run_provider_validation(&provider_id).await; + match validation_result { + Ok(()) => { + PROVIDER_RUNTIME_STATUS.remove(&provider_id); + } + Err(_validation_error) => { + PROVIDER_RUNTIME_STATUS + .insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string()); + crate::flow_debug!( + "provider validation failed provider={} error={}", + &provider_id, + crate::util::flow_debug::preview(&_validation_error, 160) + ); + } + } + PROVIDER_VALIDATION_INFLIGHT.remove(&provider_id); + }); +} + pub fn init_providers_now() { // Idempotent & thread-safe: runs the Lazy init exactly once. crate::flow_debug!( @@ -99,6 +414,7 @@ where module_path!(), ) .await; + schedule_provider_validation(provider_name, context, &panic_msg); vec![] } } @@ -115,6 +431,7 @@ pub async fn report_provider_error(provider_name: &str, context: &str, msg: &str module_path!(), ) .await; + schedule_provider_validation(provider_name, context, msg); } pub fn report_provider_error_background(provider_name: &str, context: &str, msg: &str) { @@ -232,6 +549,7 @@ fn channel_group_order(group_id: &str) -> usize { pub fn decorate_channel(channel: Channel) -> ChannelView { let metadata = channel_metadata_for(&channel.id); + let runtime_status = current_provider_channel_status(&channel.id); let ytdlp_command = match channel.id.as_str() { "pimpbunny" => Some("yt-dlp --compat-options allow-unsafe-ext".to_string()), _ => None, @@ -242,7 +560,7 @@ pub fn decorate_channel(channel: Channel) -> ChannelView { description: channel.description, premium: channel.premium, favicon: channel.favicon, - status: channel.status, + status: runtime_status.unwrap_or(channel.status), categories: channel.categories, options: channel.options, nsfw: channel.nsfw, @@ -398,7 +716,37 @@ pub trait Provider: Send + Sync { #[cfg(all(test, not(hottub_single_provider)))] mod tests { use super::*; - use crate::status::ChannelOption; + use crate::api::ClientVersion; + use crate::status::{ChannelOption, FilterOption}; + use crate::util::cache::VideoCache; + use crate::util::requester::Requester; + use crate::videos::{FlexibleNumber, VideosRequest}; + use diesel::{ + SqliteConnection, + r2d2::{self, ConnectionManager}, + }; + use ntex::http::header; + use ntex::web::{self, test}; + use serde::Deserialize; + use std::collections::HashMap; + use std::time::{SystemTime, UNIX_EPOCH}; + + #[derive(Debug, Deserialize)] + struct ApiVideosResponse { + items: Vec, + } + + #[derive(Debug, Deserialize)] + struct ApiVideoItem { + url: String, + formats: Option>, + } + + #[derive(Debug, Deserialize)] + struct ApiVideoFormat { + url: String, + http_headers: Option>, + } fn base_channel(id: &str) -> Channel { Channel { @@ -415,8 +763,172 @@ mod tests { } } + fn test_db_pool() -> DbPool { + let unique = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after unix epoch") + .as_nanos(); + let database_url = std::env::temp_dir() + .join(format!("hottub-providers-test-{unique}.sqlite")) + .to_string_lossy() + .into_owned(); + let manager = ConnectionManager::::new(database_url); + r2d2::Pool::builder() + .max_size(8) + .build(manager) + .expect("test db pool should build") + } + + fn preferred_option_value(options: &[FilterOption]) -> Option { + options + .iter() + .find(|option| option.id == "all") + .or_else(|| options.first()) + .map(|option| option.id.clone()) + } + + fn channel_option_value(channel: &Channel, option_id: &str) -> Option { + channel + .options + .iter() + .find(|option| option.id == option_id) + .and_then(|option| preferred_option_value(&option.options)) + } + + fn request_for_channel(channel: &Channel) -> VideosRequest { + VideosRequest { + clientHash: None, + blockedKeywords: None, + countryCode: None, + clientVersion: Some("2.1.4-22b".to_string()), + timestamp: None, + blockedUploaders: None, + anonId: None, + debugTools: None, + versionInstallDate: None, + languageCode: Some("en".to_string()), + appInstallDate: None, + server: None, + sexuality: channel_option_value(channel, "sexuality"), + channel: Some(channel.id.clone()), + sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())), + query: None, + page: Some(FlexibleNumber::Int(1)), + perPage: Some(FlexibleNumber::Int(5)), + featured: channel_option_value(channel, "featured"), + category: channel_option_value(channel, "category"), + sites: channel_option_value(channel, "sites"), + all_provider_sites: None, + filter: channel_option_value(channel, "filter"), + language: channel_option_value(channel, "language"), + networks: channel_option_value(channel, "networks"), + stars: channel_option_value(channel, "stars"), + categories: channel_option_value(channel, "categories"), + duration: channel_option_value(channel, "duration"), + } + } + + fn skip_reason_for_provider(provider_id: &str) -> Option<&'static str> { + if std::env::var("FLARE_URL").is_ok() { + return None; + } + + match provider_id { + "hentaihaven" | "homoxxx" | "okporn" | "okxxx" | "perfectgirls" => { + Some("requires FLARE_URL for live requests in this environment") + } + _ => None, + } + } + + fn media_target(item: &ApiVideoItem) -> (String, Vec<(String, String)>) { + if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) { + let headers = format + .http_headers + .clone() + .unwrap_or_default() + .into_iter() + .collect(); + return (format.url.clone(), headers); + } + + (item.url.clone(), Vec::new()) + } + + fn looks_like_media(content_type: &str, body: &[u8]) -> bool { + let content_type = content_type.to_ascii_lowercase(); + if content_type.starts_with("video/") + || content_type.starts_with("audio/") + || content_type.contains("mpegurl") + || content_type.contains("mp2t") + || content_type.contains("octet-stream") + { + return true; + } + + body.starts_with(b"#EXTM3U") + || body.windows(4).any(|window| window == b"ftyp") + || body.windows(4).any(|window| window == b"mdat") + } + + async fn assert_media_response( + provider_id: &str, + item_index: usize, + url: &str, + mut headers: Vec<(String, String)>, + ) -> Result<(), String> { + if !headers + .iter() + .any(|(key, _)| key.eq_ignore_ascii_case("range")) + { + headers.push(("Range".to_string(), "bytes=0-2047".to_string())); + } + + let mut requester = Requester::new(); + let response = requester + .get_raw_with_headers(url, headers) + .await + .map_err(|err| format!("{provider_id} item {} request failed for {url}: {err}", item_index + 1))?; + + let status = response.status(); + if !status.is_success() { + return Err(format!( + "{provider_id} item {} request returned {status} for {url}", + item_index + 1 + )); + } + + let content_type = response + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .unwrap_or("") + .to_string(); + let body = response + .bytes() + .await + .map_err(|err| format!("{provider_id} item {} body read failed for {url}: {err}", item_index + 1))?; + + if body.is_empty() { + return Err(format!( + "{provider_id} item {} returned an empty body for {url}", + item_index + 1 + )); + } + + if !looks_like_media(&content_type, body.as_ref()) { + return Err(format!( + "{provider_id} item {} did not look like media for {url}; content-type={content_type:?}", + item_index + 1 + )); + } + + Ok(()) + } + #[test] fn decorates_channel_with_group_and_tags() { + PROVIDER_RUNTIME_STATUS.remove("hsex"); let channel = decorate_channel(base_channel("hsex")); assert_eq!(channel.groupKey.as_deref(), Some("chinese")); assert_eq!(channel.sortOrder, None); @@ -435,6 +947,9 @@ mod tests { #[test] fn builds_group_index() { + PROVIDER_RUNTIME_STATUS.remove("all"); + PROVIDER_RUNTIME_STATUS.remove("hsex"); + PROVIDER_RUNTIME_STATUS.remove("missav"); let channels = vec![ decorate_channel(base_channel("all")), decorate_channel(base_channel("hsex")), @@ -448,6 +963,7 @@ mod tests { #[test] fn decorates_pimpbunny_with_ytdlp_command() { + PROVIDER_RUNTIME_STATUS.remove("pimpbunny"); let channel = decorate_channel(base_channel("pimpbunny")); assert_eq!( channel.ytdlpCommand.as_deref(), @@ -457,6 +973,8 @@ mod tests { #[test] fn reflects_updated_group_moves() { + PROVIDER_RUNTIME_STATUS.remove("perverzija"); + PROVIDER_RUNTIME_STATUS.remove("rule34gen"); assert_eq!( decorate_channel(base_channel("perverzija")) .groupKey @@ -473,6 +991,10 @@ mod tests { #[test] fn status_response_uses_documented_group_keys() { + PROVIDER_RUNTIME_STATUS.remove("all"); + PROVIDER_RUNTIME_STATUS.remove("hsex"); + PROVIDER_RUNTIME_STATUS.remove("missav"); + PROVIDER_RUNTIME_STATUS.remove("pimpbunny"); let mut status = Status::new(); status.channels = vec![ base_channel("missav"), @@ -515,4 +1037,114 @@ mod tests { "yt-dlp --compat-options allow-unsafe-ext" ); } + + #[test] + fn runtime_error_status_overrides_channel_status() { + PROVIDER_RUNTIME_STATUS.insert("hsex".to_string(), CHANNEL_STATUS_ERROR.to_string()); + let channel = decorate_channel(base_channel("hsex")); + assert_eq!(channel.status, CHANNEL_STATUS_ERROR); + PROVIDER_RUNTIME_STATUS.remove("hsex"); + } + + #[ntex::test] + #[ignore = "live network sweep across all providers"] + async fn api_videos_returns_working_media_urls_for_all_channels() { + let app = test::init_service( + web::App::new() + .state(test_db_pool()) + .state(VideoCache::new().max_size(10_000).to_owned()) + .state(Requester::new()) + .service(web::scope("/api").configure(crate::api::config)), + ) + .await; + + let client_version = ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string()); + let mut channels = ALL_PROVIDERS + .iter() + .filter(|(provider_id, _)| **provider_id != "all") + .filter_map(|(_, provider)| provider.get_channel(client_version.clone())) + .collect::>(); + channels.sort_by(|a, b| a.id.cmp(&b.id)); + + let mut failures = Vec::new(); + let mut skipped = Vec::new(); + + for channel in channels { + let payload = request_for_channel(&channel); + let provider_id = channel.id.clone(); + + if let Some(reason) = skip_reason_for_provider(&provider_id) { + skipped.push(format!("{provider_id}: {reason}")); + continue; + } + + let request = test::TestRequest::post() + .uri("/api/videos") + .header( + header::USER_AGENT, + "Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0", + ) + .set_json(&payload) + .to_request(); + + let response = test::call_service(&app, request).await; + let status = response.status(); + let body = test::read_body(response).await; + + if !status.is_success() { + failures.push(format!( + "{provider_id} returned status {status}: {}", + String::from_utf8_lossy(&body) + )); + continue; + } + + let payload: ApiVideosResponse = match serde_json::from_slice(&body) { + Ok(payload) => payload, + Err(error) => { + failures.push(format!( + "{provider_id} returned invalid JSON: {error}; body={}", + String::from_utf8_lossy(&body) + )); + continue; + } + }; + + if payload.items.len() < 5 { + failures.push(format!( + "{provider_id} returned fewer than 5 items: {}", + payload.items.len() + )); + continue; + } + + for (item_index, item) in payload.items.iter().take(5).enumerate() { + let (url, headers) = media_target(item); + if url.is_empty() { + failures.push(format!( + "{provider_id} item {} returned an empty media url", + item_index + 1 + )); + break; + } + + if let Err(error) = + assert_media_response(&provider_id, item_index, &url, headers).await + { + failures.push(error); + break; + } + } + } + + assert!( + failures.is_empty(), + "provider live sweep failed:\n{}", + failures.join("\n") + ); + + if !skipped.is_empty() { + eprintln!("skipped providers:\n{}", skipped.join("\n")); + } + } } diff --git a/src/providers/pornmz.rs b/src/providers/pornmz.rs index 1fa5101..59175d1 100644 --- a/src/providers/pornmz.rs +++ b/src/providers/pornmz.rs @@ -1218,7 +1218,14 @@ mod tests { let item = provider.apply_detail_video(item, html).expect("detail"); assert_eq!(item.title, "Real Title"); assert_eq!(item.url, "https://cdn.example/master.m3u8"); - assert_eq!(item.formats.as_ref().and_then(|values| values.first()).map(|value| value.format.clone()).as_deref(), Some("m3u8")); + let first_format = item + .formats + .as_ref() + .and_then(|values| values.first()) + .expect("expected a parsed format"); + let first_format_json = + serde_json::to_value(first_format).expect("format should serialize"); + assert_eq!(first_format_json.get("format").and_then(|value| value.as_str()), Some("m3u8")); assert_eq!(item.duration, 1740); assert_eq!(item.views, Some(1400)); assert_eq!(item.uploader.as_deref(), Some("Kayley Gunner")); diff --git a/src/videos.rs b/src/videos.rs index 3a4c041..d62d649 100644 --- a/src/videos.rs +++ b/src/videos.rs @@ -431,6 +431,13 @@ impl VideoFormat { } self.to_owned() } + pub fn http_headers_pairs(&self) -> Vec<(String, String)> { + self.http_headers + .clone() + .unwrap_or_default() + .into_iter() + .collect() + } pub fn format_id(mut self, format_id: String) -> Self { self.format_id = Some(format_id); self