Files
hottub/src/providers/mod.rs
2026-03-31 13:39:11 +00:00

1635 lines
53 KiB
Rust

use async_trait::async_trait;
use dashmap::{DashMap, DashSet};
use futures::FutureExt;
use once_cell::sync::Lazy;
use rustc_hash::FxHashMap as HashMap;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use crate::{
DbPool,
api::ClientVersion,
status::{Channel, ChannelGroup, ChannelView, FilterOption, Status, StatusResponse},
uploaders::UploaderProfile,
util::{cache::VideoCache, discord::send_discord_error_report, requester::Requester},
videos::{FlexibleNumber, ServerOptions, VideoItem, VideosRequest},
};
include!(concat!(env!("OUT_DIR"), "/provider_selection.rs"));
include!(concat!(env!("OUT_DIR"), "/provider_modules.rs"));
// convenient alias
pub type DynProvider = Arc<dyn Provider>;
#[derive(Clone, Copy)]
pub struct ProviderChannelMetadata {
pub group_id: &'static str,
pub tags: &'static [&'static str],
}
pub static ALL_PROVIDERS: Lazy<HashMap<&'static str, DynProvider>> = Lazy::new(|| {
let mut m = HashMap::default();
include!(concat!(env!("OUT_DIR"), "/provider_registry.rs"));
m
});
const CHANNEL_STATUS_ERROR: &str = "error";
const VALIDATION_RESULTS_REQUIRED: usize = 5;
const VALIDATION_MIN_SUCCESS: usize = 1;
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(3600);
const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100);
const VALIDATION_ERROR_RETEST_INTERVAL: Duration = VALIDATION_COOLDOWN;
const VALIDATION_FAILURES_FOR_ERROR: u8 = 5;
#[derive(Clone)]
struct ProviderValidationContext {
pool: DbPool,
cache: VideoCache,
requester: Requester,
}
#[derive(Clone, Copy)]
struct ValidationFailureState {
consecutive_failures: u8,
last_counted_at: Instant,
}
static PROVIDER_VALIDATION_CONTEXT: OnceLock<ProviderValidationContext> = OnceLock::new();
static PROVIDER_RUNTIME_STATUS: Lazy<DashMap<String, String>> = Lazy::new(DashMap::new);
static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::new);
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
static PROVIDER_VALIDATION_FAILURE_STATE: Lazy<DashMap<String, ValidationFailureState>> =
Lazy::new(DashMap::new);
static PROVIDER_ERROR_REVALIDATION_STARTED: OnceLock<()> = OnceLock::new();
fn validation_client_version() -> ClientVersion {
ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string())
}
fn preferred_option_value(options: &[FilterOption]) -> Option<String> {
options
.iter()
.find(|option| option.id == "all")
.or_else(|| options.first())
.map(|option| option.id.clone())
}
fn channel_option_value(channel: &Channel, option_id: &str) -> Option<String> {
channel
.options
.iter()
.find(|option| option.id == option_id)
.and_then(|option| preferred_option_value(&option.options))
}
fn validation_request_for_channel(channel: &Channel) -> VideosRequest {
VideosRequest {
clientHash: None,
blockedKeywords: None,
countryCode: None,
clientVersion: Some("2.1.4-22b".to_string()),
timestamp: None,
blockedUploaders: None,
anonId: None,
debugTools: None,
versionInstallDate: None,
languageCode: Some("en".to_string()),
appInstallDate: None,
server: None,
sexuality: channel_option_value(channel, "sexuality"),
channel: Some(channel.id.clone()),
sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())),
query: None,
page: Some(FlexibleNumber::Int(1)),
perPage: Some(FlexibleNumber::Int(VALIDATION_RESULTS_REQUIRED as u64)),
featured: channel_option_value(channel, "featured"),
category: channel_option_value(channel, "category"),
sites: channel_option_value(channel, "sites"),
all_provider_sites: None,
filter: channel_option_value(channel, "filter"),
language: channel_option_value(channel, "language"),
networks: channel_option_value(channel, "networks"),
stars: channel_option_value(channel, "stars"),
categories: channel_option_value(channel, "categories"),
duration: channel_option_value(channel, "duration"),
}
}
fn media_targets(item: &VideoItem) -> Vec<(String, Vec<(String, String)>)> {
let mut targets = Vec::new();
if let Some(formats) = item.formats.as_ref() {
for format in formats {
if format.url.trim().is_empty() {
continue;
}
targets.push((format.url.clone(), format.http_headers_pairs()));
}
}
if !item.url.trim().is_empty()
&& !targets
.iter()
.any(|(url, _)| url.eq_ignore_ascii_case(item.url.as_str()))
{
targets.push((item.url.clone(), Vec::new()));
}
targets
}
fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
let content_type = content_type.to_ascii_lowercase();
if content_type.starts_with("video/")
|| content_type.starts_with("audio/")
|| content_type.contains("mpegurl")
|| content_type.contains("mp2t")
|| content_type.contains("octet-stream")
{
return true;
}
body.starts_with(b"#EXTM3U")
|| body.windows(4).any(|window| window == b"ftyp")
|| body.windows(4).any(|window| window == b"mdat")
}
fn is_transient_validation_error(error: &str) -> bool {
let value = error.to_ascii_lowercase();
value.contains("client error (connect)")
|| value.contains("timed out")
|| value.contains("timeout")
|| value.contains("dns")
|| value.contains("connection reset")
|| value.contains("connection refused")
|| value.contains("temporarily unavailable")
|| value.contains("request returned 403")
|| value.contains("request returned 429")
|| value.contains("request returned 500")
|| value.contains("request returned 502")
|| value.contains("request returned 503")
|| value.contains("request returned 504")
}
async fn validate_media_response(
provider_id: &str,
item_index: usize,
url: &str,
mut headers: Vec<(String, String)>,
requester: Requester,
) -> Result<(), String> {
if !headers
.iter()
.any(|(key, _)| key.eq_ignore_ascii_case("range"))
{
headers.push(("Range".to_string(), "bytes=0-2047".to_string()));
}
let mut requester = requester;
let response = requester
.get_raw_with_headers_timeout(url, headers, Some(VALIDATION_MEDIA_TIMEOUT))
.await
.map_err(|err| {
format!(
"{provider_id} item {} request failed for {url}: {err}",
item_index + 1
)
})?;
let status = response.status();
if !status.is_success() {
return Err(format!(
"{provider_id} item {} request returned {status} for {url}",
item_index + 1
));
}
let content_type = response
.headers()
.get("content-type")
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_string();
let body = response.bytes().await.map_err(|err| {
format!(
"{provider_id} item {} body read failed for {url}: {err}",
item_index + 1
)
})?;
if body.is_empty() {
return Err(format!(
"{provider_id} item {} returned an empty body for {url}",
item_index + 1
));
}
if !looks_like_media(&content_type, body.as_ref()) {
return Err(format!(
"{provider_id} item {} did not look like media for {url}; content-type={content_type:?}",
item_index + 1
));
}
Ok(())
}
async fn run_provider_validation(provider_id: &str) -> Result<(), String> {
let Some(context) = PROVIDER_VALIDATION_CONTEXT.get().cloned() else {
return Err("provider validation context not configured".to_string());
};
let provider = ALL_PROVIDERS
.get(provider_id)
.cloned()
.ok_or_else(|| format!("unknown provider: {provider_id}"))?;
let channel = provider
.get_channel(validation_client_version())
.ok_or_else(|| format!("channel unavailable for provider: {provider_id}"))?;
let request = validation_request_for_channel(&channel);
let sort = request.sort.clone().unwrap_or_else(|| "date".to_string());
let page = request
.page
.as_ref()
.and_then(|value| value.to_u8())
.unwrap_or(1);
let per_page = request
.perPage
.as_ref()
.and_then(|value| value.to_u8())
.unwrap_or(VALIDATION_RESULTS_REQUIRED as u8);
let options = ServerOptions {
featured: request.featured.clone(),
category: request.category.clone(),
sites: request.sites.clone(),
filter: request.filter.clone(),
language: request.language.clone(),
public_url_base: None,
requester: Some(context.requester.clone()),
network: request.networks.clone(),
stars: request.stars.clone(),
categories: request.categories.clone(),
duration: request.duration.clone(),
sort: Some(sort.clone()),
sexuality: request.sexuality.clone(),
};
let items = run_provider_guarded(
provider_id,
"runtime_validation",
provider.get_videos(
context.cache.clone(),
context.pool.clone(),
sort,
request.query.clone(),
page.to_string(),
per_page.to_string(),
options,
),
)
.await;
if items.len() < VALIDATION_RESULTS_REQUIRED {
return Err(format!(
"{provider_id} returned fewer than {VALIDATION_RESULTS_REQUIRED} items: {}",
items.len()
));
}
let mut successes = 0usize;
let mut hard_failures = Vec::new();
let mut transient_failures = Vec::new();
for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() {
let targets = media_targets(item);
if targets.is_empty() {
hard_failures.push(format!(
"{provider_id} item {} returned an empty media url",
item_index + 1
));
continue;
}
let mut item_errors = Vec::new();
let mut item_validated = false;
for (url, headers) in targets {
if url.starts_with('/') {
continue;
}
item_validated = true;
match validate_media_response(
provider_id,
item_index,
&url,
headers,
context.requester.clone(),
)
.await
{
Ok(()) => {
successes += 1;
if successes >= VALIDATION_MIN_SUCCESS {
return Ok(());
}
item_errors.clear();
break;
}
Err(error) => item_errors.push(error),
}
}
if item_validated && !item_errors.is_empty() {
for error in item_errors {
if is_transient_validation_error(&error) {
transient_failures.push(error);
} else {
hard_failures.push(error);
}
}
}
}
if successes >= VALIDATION_MIN_SUCCESS {
return Ok(());
}
if hard_failures.is_empty() && !transient_failures.is_empty() {
crate::flow_debug!(
"provider validation inconclusive provider={} transient_failures={}",
provider_id,
transient_failures.len()
);
return Ok(());
}
Err(format!(
"{provider_id} validation failed: only {successes} media checks passed (required at least {VALIDATION_MIN_SUCCESS}); hard_failures={}; transient_failures={}",
hard_failures.join(" | "),
transient_failures.join(" | ")
))
}
fn reset_validation_failure_state(provider_id: &str) {
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
fn record_validation_failure(provider_id: &str, now: Instant) -> u8 {
if let Some(mut state) = PROVIDER_VALIDATION_FAILURE_STATE.get_mut(provider_id) {
if now.duration_since(state.last_counted_at) >= VALIDATION_COOLDOWN {
state.consecutive_failures = state.consecutive_failures.saturating_add(1);
state.last_counted_at = now;
}
return state.consecutive_failures;
}
PROVIDER_VALIDATION_FAILURE_STATE.insert(
provider_id.to_string(),
ValidationFailureState {
consecutive_failures: 1,
last_counted_at: now,
},
);
1
}
fn start_periodic_error_revalidation() {
if PROVIDER_ERROR_REVALIDATION_STARTED.set(()).is_err() {
return;
}
tokio::spawn(async move {
let mut interval = tokio::time::interval(VALIDATION_ERROR_RETEST_INTERVAL);
loop {
interval.tick().await;
let errored_providers = PROVIDER_RUNTIME_STATUS
.iter()
.filter_map(|entry| {
if entry.value().as_str() == CHANNEL_STATUS_ERROR {
Some(entry.key().clone())
} else {
None
}
})
.collect::<Vec<_>>();
for provider_id in errored_providers {
schedule_provider_validation(
&provider_id,
"periodic_retest",
"provider currently marked as error",
);
}
}
});
}
pub fn configure_runtime_validation(
pool: DbPool,
cache: VideoCache,
requester: Requester,
) -> Result<(), &'static str> {
PROVIDER_VALIDATION_CONTEXT
.set(ProviderValidationContext {
pool,
cache,
requester,
})
.map_err(|_| "provider validation context already configured")?;
start_periodic_error_revalidation();
Ok(())
}
pub fn current_provider_channel_status(provider_id: &str) -> Option<String> {
PROVIDER_RUNTIME_STATUS
.get(provider_id)
.map(|status| status.value().clone())
}
pub fn schedule_provider_validation(provider_id: &str, context: &str, msg: &str) {
if !ALL_PROVIDERS.contains_key(provider_id) {
return;
}
if PROVIDER_VALIDATION_CONTEXT.get().is_none() {
return;
}
if let Some(last_run) = PROVIDER_VALIDATION_LAST_RUN.get(provider_id) {
if last_run.elapsed() < VALIDATION_COOLDOWN {
crate::flow_debug!(
"provider validation skip cooldown provider={} context={} error={}",
provider_id,
context,
crate::util::flow_debug::preview(msg, 120)
);
return;
}
}
let provider_id = provider_id.to_string();
if !PROVIDER_VALIDATION_INFLIGHT.insert(provider_id.clone()) {
return;
}
PROVIDER_VALIDATION_LAST_RUN.insert(provider_id.clone(), Instant::now());
let _context_name = context.to_string();
let _error_preview = msg.to_string();
tokio::spawn(async move {
crate::flow_debug!(
"provider validation start provider={} context={} error={}",
&provider_id,
&_context_name,
crate::util::flow_debug::preview(&_error_preview, 120)
);
let validation_result = run_provider_validation(&provider_id).await;
match validation_result {
Ok(()) => {
reset_validation_failure_state(&provider_id);
PROVIDER_RUNTIME_STATUS.remove(&provider_id);
}
Err(_validation_error) => {
let failures = record_validation_failure(&provider_id, Instant::now());
if failures >= VALIDATION_FAILURES_FOR_ERROR {
PROVIDER_RUNTIME_STATUS
.insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string());
}
crate::flow_debug!(
"provider validation failed provider={} failures={} threshold={} error={}",
&provider_id,
failures,
VALIDATION_FAILURES_FOR_ERROR,
crate::util::flow_debug::preview(&_validation_error, 160)
);
}
}
PROVIDER_VALIDATION_INFLIGHT.remove(&provider_id);
});
}
pub fn init_providers_now() {
// Idempotent & thread-safe: runs the Lazy init exactly once.
crate::flow_debug!(
"provider init selection={:?}",
compile_time_selected_provider()
);
Lazy::force(&ALL_PROVIDERS);
}
pub fn compile_time_selected_provider() -> Option<&'static str> {
COMPILE_TIME_SELECTED_PROVIDER
}
pub fn resolve_provider_for_build<'a>(channel: &'a str) -> &'a str {
match compile_time_selected_provider() {
Some(selected) if channel == "all" => selected,
_ => channel,
}
}
pub fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
return (*s).to_string();
}
if let Some(s) = payload.downcast_ref::<String>() {
return s.clone();
}
"unknown panic payload".to_string()
}
pub async fn run_provider_guarded<F>(provider_name: &str, context: &str, fut: F) -> Vec<VideoItem>
where
F: Future<Output = Vec<VideoItem>>,
{
crate::flow_debug!(
"provider guard enter provider={} context={}",
provider_name,
context
);
match AssertUnwindSafe(fut).catch_unwind().await {
Ok(videos) => {
crate::flow_debug!(
"provider guard exit provider={} context={} videos={}",
provider_name,
context,
videos.len()
);
videos
}
Err(payload) => {
let panic_msg = panic_payload_to_string(payload);
crate::flow_debug!(
"provider guard panic provider={} context={} panic={}",
provider_name,
context,
&panic_msg
);
let _ = send_discord_error_report(
format!("Provider panic: {}", provider_name),
None,
Some("Provider Guard"),
Some(&format!("context={}; panic={}", context, panic_msg)),
file!(),
line!(),
module_path!(),
)
.await;
schedule_provider_validation(provider_name, context, &panic_msg);
vec![]
}
}
}
pub async fn run_uploader_provider_guarded<F>(
provider_name: &str,
context: &str,
fut: F,
) -> Result<Option<UploaderProfile>, String>
where
F: Future<Output = Result<Option<UploaderProfile>, String>>,
{
crate::flow_debug!(
"provider uploader guard enter provider={} context={}",
provider_name,
context
);
match AssertUnwindSafe(fut).catch_unwind().await {
Ok(result) => {
crate::flow_debug!(
"provider uploader guard exit provider={} context={} matched={}",
provider_name,
context,
result.as_ref().ok().and_then(|value| value.as_ref()).is_some()
);
result
}
Err(payload) => {
let panic_msg = panic_payload_to_string(payload);
crate::flow_debug!(
"provider uploader guard panic provider={} context={} panic={}",
provider_name,
context,
&panic_msg
);
let _ = send_discord_error_report(
format!("Provider panic: {}", provider_name),
None,
Some("Provider Guard"),
Some(&format!("context={}; panic={}", context, panic_msg)),
file!(),
line!(),
module_path!(),
)
.await;
schedule_provider_validation(provider_name, context, &panic_msg);
Err(panic_msg)
}
}
}
pub async fn report_provider_error(provider_name: &str, context: &str, msg: &str) {
let _ = send_discord_error_report(
format!("Provider error: {}", provider_name),
None,
Some("Provider Guard"),
Some(&format!("context={}; error={}", context, msg)),
file!(),
line!(),
module_path!(),
)
.await;
schedule_provider_validation(provider_name, context, msg);
}
pub fn report_provider_error_background(provider_name: &str, context: &str, msg: &str) {
let provider_name = provider_name.to_string();
let context = context.to_string();
let msg = msg.to_string();
tokio::spawn(async move {
report_provider_error(&provider_name, &context, &msg).await;
});
}
pub fn requester_or_default(
options: &ServerOptions,
provider_name: &str,
context: &str,
) -> Requester {
match options.requester.clone() {
Some(requester) => {
crate::flow_debug!(
"provider requester existing provider={} context={} trace={}",
provider_name,
context,
requester.debug_trace_id().unwrap_or("none")
);
requester
}
None => {
crate::flow_debug!(
"provider requester fallback provider={} context={}",
provider_name,
context
);
report_provider_error_background(
provider_name,
context,
"ServerOptions.requester missing; using default Requester",
);
Requester::new()
}
}
}
pub fn strip_url_scheme(url: &str) -> String {
url.strip_prefix("https://")
.or_else(|| url.strip_prefix("http://"))
.unwrap_or(url)
.trim_start_matches('/')
.to_string()
}
pub fn build_proxy_url(options: &ServerOptions, proxy: &str, target: &str) -> String {
let target = target.trim_start_matches('/');
let base = options
.public_url_base
.as_deref()
.unwrap_or("")
.trim_end_matches('/');
if base.is_empty() {
format!("/proxy/{proxy}/{target}")
} else {
format!("{base}/proxy/{proxy}/{target}")
}
}
fn channel_metadata_for(id: &str) -> Option<ProviderChannelMetadata> {
include!(concat!(env!("OUT_DIR"), "/provider_metadata_fn.rs"))
}
fn channel_group_title(group_id: &str) -> &'static str {
match group_id {
"meta-search" => "Meta Search",
"mainstream-tube" => "Mainstream Tube",
"tiktok" => "Tiktok",
"studio-network" => "Studio & Network",
"amateur-homemade" => "Amateur & Homemade",
"onlyfans" => "OnlyFans",
"chinese" => "Chinese",
"jav" => "JAV",
"fetish-kink" => "Fetish & Kink",
"hentai-animation" => "Hentai & Animation",
"ai" => "AI",
"gay-male" => "Gay & Male",
"live-cams" => "Live Cams",
"pmv-compilation" => "PMV & Compilation",
_ => "Other",
}
}
fn channel_group_system_image(group_id: &str) -> Option<&'static str> {
match group_id {
"jav" | "chinese" => Some("globe"),
_ => None,
}
}
fn channel_group_order(group_id: &str) -> usize {
match group_id {
"meta-search" => 0,
"mainstream-tube" => 1,
"tiktok" => 2,
"studio-network" => 3,
"onlyfans" => 4,
"chinese" => 5,
"jav" => 6,
"fetish-kink" => 7,
"hentai-animation" => 8,
"ai" => 9,
"gay-male" => 10,
"live-cams" => 11,
"pmv-compilation" => 12,
_ => 99,
}
}
pub fn decorate_channel(channel: Channel) -> ChannelView {
let metadata = channel_metadata_for(&channel.id);
let runtime_status = current_provider_channel_status(&channel.id);
let ytdlp_command = match channel.id.as_str() {
"pimpbunny" => Some("yt-dlp --compat-options allow-unsafe-ext".to_string()),
_ => None,
};
ChannelView {
id: channel.id,
name: channel.name,
description: channel.description,
premium: channel.premium,
favicon: channel.favicon,
status: runtime_status.unwrap_or(channel.status),
categories: channel.categories,
options: channel.options,
nsfw: channel.nsfw,
groupKey: metadata.map(|value| value.group_id.to_string()),
sortOrder: None,
tags: metadata.map(|value| {
value
.tags
.iter()
.take(3)
.map(|tag| (*tag).to_string())
.collect()
}),
ytdlpCommand: ytdlp_command,
cacheDuration: channel.cacheDuration,
}
}
pub fn build_channel_groups(channels: &[ChannelView]) -> Vec<ChannelGroup> {
let mut groups = Vec::new();
let mut group_ids = channels
.iter()
.filter_map(|channel| channel.groupKey.clone())
.collect::<Vec<_>>();
group_ids.sort_by_key(|group_id| (channel_group_order(group_id), group_id.clone()));
group_ids.dedup();
for group_id in group_ids {
let mut grouped_channels = channels
.iter()
.filter(|channel| channel.groupKey.as_deref() == Some(group_id.as_str()))
.collect::<Vec<_>>();
grouped_channels.sort_by(|a, b| {
(a.sortOrder.unwrap_or(u32::MAX), &a.name, &a.id).cmp(&(
b.sortOrder.unwrap_or(u32::MAX),
&b.name,
&b.id,
))
});
let channel_ids = grouped_channels
.into_iter()
.map(|channel| channel.id.clone())
.collect::<Vec<_>>();
groups.push(ChannelGroup {
id: group_id.clone(),
title: channel_group_title(&group_id).to_string(),
systemImage: channel_group_system_image(&group_id).map(str::to_string),
channelIds: channel_ids,
});
}
groups
}
fn assign_channel_sort_order(channels: &mut [ChannelView]) {
let mut ordered = channels
.iter()
.enumerate()
.map(|(index, channel)| {
(
index,
channel.groupKey.clone(),
channel.name.to_ascii_lowercase(),
channel.id.to_ascii_lowercase(),
)
})
.collect::<Vec<_>>();
ordered.sort_by(|a, b| {
let a_group = a.1.as_deref().unwrap_or("");
let b_group = b.1.as_deref().unwrap_or("");
(channel_group_order(a_group), a_group, &a.2, &a.3).cmp(&(
channel_group_order(b_group),
b_group,
&b.2,
&b.3,
))
});
for (sort_index, (channel_index, _, _, _)) in ordered.into_iter().enumerate() {
channels[channel_index].sortOrder = Some((sort_index + 1) as u32);
}
}
pub fn build_status_response(status: Status) -> StatusResponse {
let mut channels = status
.channels
.into_iter()
.map(decorate_channel)
.collect::<Vec<_>>();
assign_channel_sort_order(&mut channels);
let channelGroups = build_channel_groups(&channels);
crate::flow_debug!(
"status response build channels={} groups={}",
channels.len(),
channelGroups.len()
);
StatusResponse {
id: status.id,
name: status.name,
subtitle: status.subtitle,
description: status.description,
iconUrl: status.iconUrl,
color: status.color,
status: status.status,
notices: status.notices,
channels,
channelGroups,
subscription: status.subscription,
nsfw: status.nsfw,
categories: status.categories,
options: status.options,
filtersFooter: status.filtersFooter,
}
}
#[async_trait]
pub trait Provider: Send + Sync {
async fn get_videos(
&self,
cache: VideoCache,
pool: DbPool,
sort: String,
query: Option<String>,
page: String,
per_page: String,
options: ServerOptions,
) -> Vec<VideoItem>;
fn get_channel(&self, clientversion: ClientVersion) -> Option<Channel> {
println!(
"Getting channel for placeholder with client version: {:?}",
clientversion
);
let _ = clientversion;
Some(Channel {
id: "placeholder".to_string(),
name: "PLACEHOLDER".to_string(),
description: "PLACEHOLDER FOR PARENT CLASS".to_string(),
premium: false,
favicon: "https://www.google.com/s2/favicons?sz=64&domain=missav.ws".to_string(),
status: "active".to_string(),
categories: vec![],
options: vec![],
nsfw: true,
cacheDuration: None,
})
}
async fn get_uploader(
&self,
_cache: VideoCache,
_pool: DbPool,
_uploader_id: Option<String>,
_uploader_name: Option<String>,
_query: Option<String>,
_profile_content: bool,
_options: ServerOptions,
) -> Result<Option<UploaderProfile>, String> {
Ok(None)
}
}
#[cfg(all(test, not(hottub_single_provider)))]
mod tests {
use super::*;
use crate::api::ClientVersion;
use crate::status::{ChannelOption, FilterOption};
use crate::util::cache::VideoCache;
use crate::util::requester::Requester;
use crate::videos::{FlexibleNumber, VideosRequest};
use diesel::{
SqliteConnection,
r2d2::{self, ConnectionManager},
};
use ntex::http::header;
use ntex::web::{self, test};
use serde::Deserialize;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Deserialize)]
struct ApiVideosResponse {
items: Vec<ApiVideoItem>,
}
#[derive(Debug, Deserialize)]
struct ApiVideoItem {
#[serde(default)]
title: String,
url: String,
formats: Option<Vec<ApiVideoFormat>>,
}
#[derive(Debug, Deserialize)]
struct ApiVideoFormat {
url: String,
http_headers: Option<HashMap<String, String>>,
}
fn base_channel(id: &str) -> Channel {
Channel {
id: id.to_string(),
name: id.to_string(),
description: String::new(),
premium: false,
favicon: String::new(),
status: "active".to_string(),
categories: vec![],
options: Vec::<ChannelOption>::new(),
nsfw: true,
cacheDuration: None,
}
}
fn test_db_pool() -> DbPool {
let unique = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after unix epoch")
.as_nanos();
let database_url = std::env::temp_dir()
.join(format!("hottub-providers-test-{unique}.sqlite"))
.to_string_lossy()
.into_owned();
let manager = ConnectionManager::<SqliteConnection>::new(database_url);
r2d2::Pool::builder()
.max_size(8)
.build(manager)
.expect("test db pool should build")
}
fn preferred_option_value(options: &[FilterOption]) -> Option<String> {
options
.iter()
.find(|option| option.id == "all")
.or_else(|| options.first())
.map(|option| option.id.clone())
}
fn channel_option_value(channel: &Channel, option_id: &str) -> Option<String> {
channel
.options
.iter()
.find(|option| option.id == option_id)
.and_then(|option| preferred_option_value(&option.options))
}
fn request_for_channel(channel: &Channel) -> VideosRequest {
VideosRequest {
clientHash: None,
blockedKeywords: None,
countryCode: None,
clientVersion: Some("2.1.4-22b".to_string()),
timestamp: None,
blockedUploaders: None,
anonId: None,
debugTools: None,
versionInstallDate: None,
languageCode: Some("en".to_string()),
appInstallDate: None,
server: None,
sexuality: channel_option_value(channel, "sexuality"),
channel: Some(channel.id.clone()),
sort: channel_option_value(channel, "sort").or_else(|| Some("new".to_string())),
query: None,
page: Some(FlexibleNumber::Int(1)),
perPage: Some(FlexibleNumber::Int(5)),
featured: channel_option_value(channel, "featured"),
category: channel_option_value(channel, "category"),
sites: channel_option_value(channel, "sites"),
all_provider_sites: None,
filter: channel_option_value(channel, "filter"),
language: channel_option_value(channel, "language"),
networks: channel_option_value(channel, "networks"),
stars: channel_option_value(channel, "stars"),
categories: channel_option_value(channel, "categories"),
duration: channel_option_value(channel, "duration"),
}
}
fn request_for_channel_with_query(channel: &Channel, query: String) -> VideosRequest {
let mut request = request_for_channel(channel);
request.query = Some(query);
request
}
fn search_queries_for_channel(provider_id: &str, items: &[ApiVideoItem]) -> Vec<String> {
let mut candidates = Vec::new();
match provider_id {
"yesporn" => candidates.push("anal".to_string()),
_ => {}
}
for item in items {
for token in item.title.split_whitespace() {
let cleaned = token
.chars()
.filter(|ch| ch.is_alphanumeric())
.collect::<String>();
if cleaned.len() >= 3
&& !candidates
.iter()
.any(|existing| existing.eq_ignore_ascii_case(&cleaned))
{
candidates.push(cleaned);
}
}
}
if candidates.is_empty() {
candidates.push("video".to_string());
}
candidates
}
fn skip_reason_for_provider(provider_id: &str) -> Option<&'static str> {
if std::env::var("FLARE_URL").is_ok() {
return None;
}
match provider_id {
"hentaihaven" | "homoxxx" | "okporn" | "okxxx" | "perfectgirls" => {
Some("requires FLARE_URL for live requests in this environment")
}
_ => None,
}
}
fn provider_filter_matches(provider_id: &str) -> bool {
match std::env::var("HOTTUB_TEST_PROVIDER") {
Ok(filter) => filter.trim().is_empty() || filter.trim() == provider_id,
Err(_) => true,
}
}
fn media_target(item: &ApiVideoItem) -> (String, Vec<(String, String)>) {
if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) {
let headers = format
.http_headers
.clone()
.unwrap_or_default()
.into_iter()
.collect();
return (format.url.clone(), headers);
}
(item.url.clone(), Vec::new())
}
fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
let content_type = content_type.to_ascii_lowercase();
if content_type.starts_with("video/")
|| content_type.starts_with("audio/")
|| content_type.contains("mpegurl")
|| content_type.contains("mp2t")
|| content_type.contains("octet-stream")
{
return true;
}
body.starts_with(b"#EXTM3U")
|| body.windows(4).any(|window| window == b"ftyp")
|| body.windows(4).any(|window| window == b"mdat")
}
async fn assert_media_response(
provider_id: &str,
item_index: usize,
url: &str,
mut headers: Vec<(String, String)>,
) -> Result<(), String> {
if !headers
.iter()
.any(|(key, _)| key.eq_ignore_ascii_case("range"))
{
headers.push(("Range".to_string(), "bytes=0-2047".to_string()));
}
let mut requester = Requester::new();
let response = requester
.get_raw_with_headers_timeout(url, headers, Some(VALIDATION_MEDIA_TIMEOUT))
.await
.map_err(|err| {
format!(
"{provider_id} item {} request failed for {url}: {err}",
item_index + 1
)
})?;
let status = response.status();
if !status.is_success() {
return Err(format!(
"{provider_id} item {} request returned {status} for {url}",
item_index + 1
));
}
let content_type = response
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or("")
.to_string();
let body = response.bytes().await.map_err(|err| {
format!(
"{provider_id} item {} body read failed for {url}: {err}",
item_index + 1
)
})?;
if body.is_empty() {
return Err(format!(
"{provider_id} item {} returned an empty body for {url}",
item_index + 1
));
}
if !looks_like_media(&content_type, body.as_ref()) {
return Err(format!(
"{provider_id} item {} did not look like media for {url}; content-type={content_type:?}",
item_index + 1
));
}
Ok(())
}
#[test]
fn decorates_channel_with_group_and_tags() {
PROVIDER_RUNTIME_STATUS.remove("hsex");
let channel = decorate_channel(base_channel("hsex"));
assert_eq!(channel.groupKey.as_deref(), Some("chinese"));
assert_eq!(channel.sortOrder, None);
assert_eq!(channel.ytdlpCommand, None);
assert_eq!(
channel.tags.as_deref(),
Some(
&[
"amateur".to_string(),
"chinese".to_string(),
"homemade".to_string(),
][..]
)
);
}
#[test]
fn validation_failure_streak_requires_hourly_spacing() {
let provider_id = "hsex";
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
let now = Instant::now();
assert_eq!(record_validation_failure(provider_id, now), 1);
assert_eq!(record_validation_failure(provider_id, now), 1);
assert_eq!(
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN),
2
);
assert_eq!(
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * 2),
3
);
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
#[test]
fn validation_failure_streak_resets_after_success() {
let provider_id = "hsex";
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
let now = Instant::now();
assert_eq!(record_validation_failure(provider_id, now), 1);
assert_eq!(
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN),
2
);
reset_validation_failure_state(provider_id);
assert_eq!(
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * 2),
1
);
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
#[test]
fn validation_failure_threshold_matches_channel_error_policy() {
let provider_id = "hsex";
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
let now = Instant::now();
let mut counted = 0;
for step in 0..VALIDATION_FAILURES_FOR_ERROR {
counted = record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * step as u32);
}
assert_eq!(counted, VALIDATION_FAILURES_FOR_ERROR);
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
#[test]
fn builds_group_index() {
PROVIDER_RUNTIME_STATUS.remove("all");
PROVIDER_RUNTIME_STATUS.remove("hsex");
PROVIDER_RUNTIME_STATUS.remove("missav");
let channels = vec![
decorate_channel(base_channel("all")),
decorate_channel(base_channel("hsex")),
decorate_channel(base_channel("missav")),
];
let groups = build_channel_groups(&channels);
assert_eq!(groups[0].id, "meta-search");
assert_eq!(groups[1].id, "chinese");
assert_eq!(groups[2].id, "jav");
}
#[test]
fn decorates_pimpbunny_with_ytdlp_command() {
PROVIDER_RUNTIME_STATUS.remove("pimpbunny");
let channel = decorate_channel(base_channel("pimpbunny"));
assert_eq!(
channel.ytdlpCommand.as_deref(),
Some("yt-dlp --compat-options allow-unsafe-ext")
);
}
#[test]
fn reflects_updated_group_moves() {
PROVIDER_RUNTIME_STATUS.remove("perverzija");
PROVIDER_RUNTIME_STATUS.remove("rule34gen");
assert_eq!(
decorate_channel(base_channel("perverzija"))
.groupKey
.as_deref(),
Some("studio-network")
);
assert_eq!(
decorate_channel(base_channel("rule34gen"))
.groupKey
.as_deref(),
Some("ai")
);
}
#[test]
fn status_response_uses_documented_group_keys() {
PROVIDER_RUNTIME_STATUS.remove("all");
PROVIDER_RUNTIME_STATUS.remove("hsex");
PROVIDER_RUNTIME_STATUS.remove("missav");
PROVIDER_RUNTIME_STATUS.remove("pimpbunny");
let mut status = Status::new();
status.channels = vec![
base_channel("missav"),
base_channel("hsex"),
base_channel("all"),
base_channel("pimpbunny"),
];
let json = serde_json::to_value(build_status_response(status)).expect("valid status json");
let channels = json["channels"].as_array().expect("channels array");
let all_channel = channels
.iter()
.find(|channel| channel["id"] == "all")
.expect("all channel present");
assert_eq!(all_channel["groupKey"], "meta-search");
assert!(all_channel.get("group").is_none());
assert!(all_channel["sortOrder"].is_number());
let groups = json["channelGroups"].as_array().expect("group array");
let meta_group = groups
.iter()
.find(|group| group["id"] == "meta-search")
.expect("meta group present");
assert_eq!(meta_group["channelIds"], serde_json::json!(["all"]));
assert!(meta_group.get("channels").is_none());
let chinese_group = groups
.iter()
.find(|group| group["id"] == "chinese")
.expect("chinese group present");
assert_eq!(chinese_group["systemImage"], "globe");
let pimpbunny_channel = channels
.iter()
.find(|channel| channel["id"] == "pimpbunny")
.expect("pimpbunny channel present");
assert_eq!(
pimpbunny_channel["ytdlpCommand"],
"yt-dlp --compat-options allow-unsafe-ext"
);
}
#[test]
fn runtime_error_status_overrides_channel_status() {
PROVIDER_RUNTIME_STATUS.insert("hsex".to_string(), CHANNEL_STATUS_ERROR.to_string());
let channel = decorate_channel(base_channel("hsex"));
assert_eq!(channel.status, CHANNEL_STATUS_ERROR);
PROVIDER_RUNTIME_STATUS.remove("hsex");
}
#[ntex::test]
#[ignore = "live network sweep across all providers"]
async fn api_videos_returns_working_media_urls_for_all_channels() {
let app = test::init_service(
web::App::new()
.state(test_db_pool())
.state(VideoCache::new().max_size(10_000).to_owned())
.state(Requester::new())
.service(web::scope("/api").configure(crate::api::config)),
)
.await;
let client_version = ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string());
let mut channels = ALL_PROVIDERS
.iter()
.filter(|(provider_id, _)| **provider_id != "all")
.filter(|(provider_id, _)| provider_filter_matches(provider_id))
.filter_map(|(_, provider)| provider.get_channel(client_version.clone()))
.collect::<Vec<_>>();
channels.sort_by(|a, b| a.id.cmp(&b.id));
let mut failures = Vec::new();
let mut skipped = Vec::new();
for channel in channels {
let payload = request_for_channel(&channel);
let provider_id = channel.id.clone();
if let Some(reason) = skip_reason_for_provider(&provider_id) {
skipped.push(format!("{provider_id}: {reason}"));
continue;
}
let request = test::TestRequest::post()
.uri("/api/videos")
.header(
header::USER_AGENT,
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
)
.set_json(&payload)
.to_request();
let response = test::call_service(&app, request).await;
let status = response.status();
let body = test::read_body(response).await;
if !status.is_success() {
failures.push(format!(
"{provider_id} returned status {status}: {}",
String::from_utf8_lossy(&body)
));
continue;
}
let payload: ApiVideosResponse = match serde_json::from_slice(&body) {
Ok(payload) => payload,
Err(error) => {
failures.push(format!(
"{provider_id} returned invalid JSON: {error}; body={}",
String::from_utf8_lossy(&body)
));
continue;
}
};
if payload.items.len() < 5 {
failures.push(format!(
"{provider_id} returned fewer than 5 items: {}",
payload.items.len()
));
continue;
}
for (item_index, item) in payload.items.iter().take(5).enumerate() {
let (url, headers) = media_target(item);
if url.is_empty() {
failures.push(format!(
"{provider_id} item {} returned an empty media url",
item_index + 1
));
break;
}
if let Err(error) =
assert_media_response(&provider_id, item_index, &url, headers).await
{
failures.push(error);
break;
}
}
}
assert!(
failures.is_empty(),
"provider live sweep failed:\n{}",
failures.join("\n")
);
if !skipped.is_empty() {
eprintln!("skipped providers:\n{}", skipped.join("\n"));
}
}
#[ntex::test]
#[ignore = "live search sweep across all providers"]
async fn api_videos_search_returns_working_media_urls_for_all_channels() {
let app = test::init_service(
web::App::new()
.state(test_db_pool())
.state(VideoCache::new().max_size(10_000).to_owned())
.state(Requester::new())
.service(web::scope("/api").configure(crate::api::config)),
)
.await;
let client_version = ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string());
let mut channels = ALL_PROVIDERS
.iter()
.filter(|(provider_id, _)| **provider_id != "all")
.filter(|(provider_id, _)| provider_filter_matches(provider_id))
.filter_map(|(_, provider)| provider.get_channel(client_version.clone()))
.collect::<Vec<_>>();
channels.sort_by(|a, b| a.id.cmp(&b.id));
let mut failures = Vec::new();
let mut skipped = Vec::new();
for channel in channels {
let provider_id = channel.id.clone();
if let Some(reason) = skip_reason_for_provider(&provider_id) {
skipped.push(format!("{provider_id}: {reason}"));
continue;
}
let baseline_payload = request_for_channel(&channel);
let baseline_request = test::TestRequest::post()
.uri("/api/videos")
.header(
header::USER_AGENT,
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
)
.set_json(&baseline_payload)
.to_request();
let baseline_response = test::call_service(&app, baseline_request).await;
let baseline_status = baseline_response.status();
let baseline_body = test::read_body(baseline_response).await;
if !baseline_status.is_success() {
failures.push(format!(
"{provider_id} baseline request returned status {baseline_status}: {}",
String::from_utf8_lossy(&baseline_body)
));
continue;
}
let baseline: ApiVideosResponse = match serde_json::from_slice(&baseline_body) {
Ok(payload) => payload,
Err(error) => {
failures.push(format!(
"{provider_id} baseline returned invalid JSON: {error}; body={}",
String::from_utf8_lossy(&baseline_body)
));
continue;
}
};
if baseline.items.is_empty() {
failures.push(format!(
"{provider_id} baseline returned no items for search seed"
));
continue;
}
let mut selected_payload: Option<ApiVideosResponse> = None;
let mut last_error: Option<String> = None;
for search_query in search_queries_for_channel(&provider_id, &baseline.items)
.into_iter()
.take(12)
{
if search_query.trim().is_empty() {
continue;
}
let payload = request_for_channel_with_query(&channel, search_query.clone());
let request = test::TestRequest::post()
.uri("/api/videos")
.header(
header::USER_AGENT,
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
)
.set_json(&payload)
.to_request();
let response = test::call_service(&app, request).await;
let status = response.status();
let body = test::read_body(response).await;
if !status.is_success() {
last_error = Some(format!(
"{provider_id} search query={search_query} returned status {status}: {}",
String::from_utf8_lossy(&body)
));
continue;
}
let payload: ApiVideosResponse = match serde_json::from_slice(&body) {
Ok(payload) => payload,
Err(error) => {
last_error = Some(format!(
"{provider_id} search query={search_query} returned invalid JSON: {error}; body={}",
String::from_utf8_lossy(&body)
));
continue;
}
};
if payload.items.len() >= 5 {
selected_payload = Some(payload);
break;
}
last_error = Some(format!(
"{provider_id} search query={search_query} returned fewer than 5 items: {}",
payload.items.len()
));
}
let Some(payload) = selected_payload else {
failures.push(last_error.unwrap_or_else(|| {
format!("{provider_id} search did not yield at least 5 items")
}));
continue;
};
for (item_index, item) in payload.items.iter().take(5).enumerate() {
let (url, headers) = media_target(item);
if url.is_empty() {
failures.push(format!(
"{provider_id} search item {} returned an empty media url",
item_index + 1
));
break;
}
if let Err(error) =
assert_media_response(&provider_id, item_index, &url, headers).await
{
failures.push(error);
break;
}
}
}
assert!(
failures.is_empty(),
"provider live search sweep failed:\n{}",
failures.join("\n")
);
if !skipped.is_empty() {
eprintln!("skipped providers:\n{}", skipped.join("\n"));
}
}
}