upgrades
This commit is contained in:
@@ -36,10 +36,11 @@ pub static ALL_PROVIDERS: Lazy<HashMap<&'static str, DynProvider>> = Lazy::new(|
|
||||
|
||||
const CHANNEL_STATUS_ERROR: &str = "error";
|
||||
const VALIDATION_RESULTS_REQUIRED: usize = 5;
|
||||
const VALIDATION_MIN_SUCCESS: usize = 3;
|
||||
const VALIDATION_MIN_SUCCESS: usize = 1;
|
||||
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(3600);
|
||||
const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100);
|
||||
const VALIDATION_ERROR_RETEST_INTERVAL: Duration = Duration::from_secs(5 * 60);
|
||||
const VALIDATION_ERROR_RETEST_INTERVAL: Duration = VALIDATION_COOLDOWN;
|
||||
const VALIDATION_FAILURES_FOR_ERROR: u8 = 5;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ProviderValidationContext {
|
||||
@@ -48,10 +49,18 @@ struct ProviderValidationContext {
|
||||
requester: Requester,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct ValidationFailureState {
|
||||
consecutive_failures: u8,
|
||||
last_counted_at: Instant,
|
||||
}
|
||||
|
||||
static PROVIDER_VALIDATION_CONTEXT: OnceLock<ProviderValidationContext> = OnceLock::new();
|
||||
static PROVIDER_RUNTIME_STATUS: Lazy<DashMap<String, String>> = Lazy::new(DashMap::new);
|
||||
static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::new);
|
||||
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
|
||||
static PROVIDER_VALIDATION_FAILURE_STATE: Lazy<DashMap<String, ValidationFailureState>> =
|
||||
Lazy::new(DashMap::new);
|
||||
static PROVIDER_ERROR_REVALIDATION_STARTED: OnceLock<()> = OnceLock::new();
|
||||
|
||||
fn validation_client_version() -> ClientVersion {
|
||||
@@ -107,12 +116,27 @@ fn validation_request_for_channel(channel: &Channel) -> VideosRequest {
|
||||
}
|
||||
}
|
||||
|
||||
fn media_target(item: &VideoItem) -> (String, Vec<(String, String)>) {
|
||||
if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) {
|
||||
return (format.url.clone(), format.http_headers_pairs());
|
||||
fn media_targets(item: &VideoItem) -> Vec<(String, Vec<(String, String)>)> {
|
||||
let mut targets = Vec::new();
|
||||
|
||||
if let Some(formats) = item.formats.as_ref() {
|
||||
for format in formats {
|
||||
if format.url.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
targets.push((format.url.clone(), format.http_headers_pairs()));
|
||||
}
|
||||
}
|
||||
|
||||
(item.url.clone(), Vec::new())
|
||||
if !item.url.trim().is_empty()
|
||||
&& !targets
|
||||
.iter()
|
||||
.any(|(url, _)| url.eq_ignore_ascii_case(item.url.as_str()))
|
||||
{
|
||||
targets.push((item.url.clone(), Vec::new()));
|
||||
}
|
||||
|
||||
targets
|
||||
}
|
||||
|
||||
fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
|
||||
@@ -131,6 +155,23 @@ fn looks_like_media(content_type: &str, body: &[u8]) -> bool {
|
||||
|| body.windows(4).any(|window| window == b"mdat")
|
||||
}
|
||||
|
||||
fn is_transient_validation_error(error: &str) -> bool {
|
||||
let value = error.to_ascii_lowercase();
|
||||
value.contains("client error (connect)")
|
||||
|| value.contains("timed out")
|
||||
|| value.contains("timeout")
|
||||
|| value.contains("dns")
|
||||
|| value.contains("connection reset")
|
||||
|| value.contains("connection refused")
|
||||
|| value.contains("temporarily unavailable")
|
||||
|| value.contains("request returned 403")
|
||||
|| value.contains("request returned 429")
|
||||
|| value.contains("request returned 500")
|
||||
|| value.contains("request returned 502")
|
||||
|| value.contains("request returned 503")
|
||||
|| value.contains("request returned 504")
|
||||
}
|
||||
|
||||
async fn validate_media_response(
|
||||
provider_id: &str,
|
||||
item_index: usize,
|
||||
@@ -256,42 +297,100 @@ async fn run_provider_validation(provider_id: &str) -> Result<(), String> {
|
||||
}
|
||||
|
||||
let mut successes = 0usize;
|
||||
let mut failures = Vec::new();
|
||||
let mut hard_failures = Vec::new();
|
||||
let mut transient_failures = Vec::new();
|
||||
for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() {
|
||||
let (url, headers) = media_target(item);
|
||||
if url.is_empty() {
|
||||
failures.push(format!(
|
||||
let targets = media_targets(item);
|
||||
if targets.is_empty() {
|
||||
hard_failures.push(format!(
|
||||
"{provider_id} item {} returned an empty media url",
|
||||
item_index + 1
|
||||
));
|
||||
continue;
|
||||
}
|
||||
|
||||
match validate_media_response(
|
||||
provider_id,
|
||||
item_index,
|
||||
&url,
|
||||
headers,
|
||||
context.requester.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
successes += 1;
|
||||
if successes >= VALIDATION_MIN_SUCCESS {
|
||||
return Ok(());
|
||||
let mut item_errors = Vec::new();
|
||||
let mut item_validated = false;
|
||||
for (url, headers) in targets {
|
||||
if url.starts_with('/') {
|
||||
continue;
|
||||
}
|
||||
item_validated = true;
|
||||
match validate_media_response(
|
||||
provider_id,
|
||||
item_index,
|
||||
&url,
|
||||
headers,
|
||||
context.requester.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
successes += 1;
|
||||
if successes >= VALIDATION_MIN_SUCCESS {
|
||||
return Ok(());
|
||||
}
|
||||
item_errors.clear();
|
||||
break;
|
||||
}
|
||||
Err(error) => item_errors.push(error),
|
||||
}
|
||||
}
|
||||
|
||||
if item_validated && !item_errors.is_empty() {
|
||||
for error in item_errors {
|
||||
if is_transient_validation_error(&error) {
|
||||
transient_failures.push(error);
|
||||
} else {
|
||||
hard_failures.push(error);
|
||||
}
|
||||
}
|
||||
Err(error) => failures.push(error),
|
||||
}
|
||||
}
|
||||
|
||||
if successes >= VALIDATION_MIN_SUCCESS {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if hard_failures.is_empty() && !transient_failures.is_empty() {
|
||||
crate::flow_debug!(
|
||||
"provider validation inconclusive provider={} transient_failures={}",
|
||||
provider_id,
|
||||
transient_failures.len()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(format!(
|
||||
"{provider_id} validation failed: only {successes} media checks passed (required at least {VALIDATION_MIN_SUCCESS}); failures={}",
|
||||
failures.join(" | ")
|
||||
"{provider_id} validation failed: only {successes} media checks passed (required at least {VALIDATION_MIN_SUCCESS}); hard_failures={}; transient_failures={}",
|
||||
hard_failures.join(" | "),
|
||||
transient_failures.join(" | ")
|
||||
))
|
||||
}
|
||||
|
||||
fn reset_validation_failure_state(provider_id: &str) {
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
}
|
||||
|
||||
fn record_validation_failure(provider_id: &str, now: Instant) -> u8 {
|
||||
if let Some(mut state) = PROVIDER_VALIDATION_FAILURE_STATE.get_mut(provider_id) {
|
||||
if now.duration_since(state.last_counted_at) >= VALIDATION_COOLDOWN {
|
||||
state.consecutive_failures = state.consecutive_failures.saturating_add(1);
|
||||
state.last_counted_at = now;
|
||||
}
|
||||
return state.consecutive_failures;
|
||||
}
|
||||
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.insert(
|
||||
provider_id.to_string(),
|
||||
ValidationFailureState {
|
||||
consecutive_failures: 1,
|
||||
last_counted_at: now,
|
||||
},
|
||||
);
|
||||
1
|
||||
}
|
||||
|
||||
fn start_periodic_error_revalidation() {
|
||||
if PROVIDER_ERROR_REVALIDATION_STARTED.set(()).is_err() {
|
||||
return;
|
||||
@@ -383,14 +482,20 @@ pub fn schedule_provider_validation(provider_id: &str, context: &str, msg: &str)
|
||||
let validation_result = run_provider_validation(&provider_id).await;
|
||||
match validation_result {
|
||||
Ok(()) => {
|
||||
reset_validation_failure_state(&provider_id);
|
||||
PROVIDER_RUNTIME_STATUS.remove(&provider_id);
|
||||
}
|
||||
Err(_validation_error) => {
|
||||
PROVIDER_RUNTIME_STATUS
|
||||
.insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string());
|
||||
let failures = record_validation_failure(&provider_id, Instant::now());
|
||||
if failures >= VALIDATION_FAILURES_FOR_ERROR {
|
||||
PROVIDER_RUNTIME_STATUS
|
||||
.insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string());
|
||||
}
|
||||
crate::flow_debug!(
|
||||
"provider validation failed provider={} error={}",
|
||||
"provider validation failed provider={} failures={} threshold={} error={}",
|
||||
&provider_id,
|
||||
failures,
|
||||
VALIDATION_FAILURES_FOR_ERROR,
|
||||
crate::util::flow_debug::preview(&_validation_error, 160)
|
||||
);
|
||||
}
|
||||
@@ -790,6 +895,8 @@ mod tests {
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ApiVideoItem {
|
||||
#[serde(default)]
|
||||
title: String,
|
||||
url: String,
|
||||
formats: Option<Vec<ApiVideoFormat>>,
|
||||
}
|
||||
@@ -880,6 +987,41 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn request_for_channel_with_query(channel: &Channel, query: String) -> VideosRequest {
|
||||
let mut request = request_for_channel(channel);
|
||||
request.query = Some(query);
|
||||
request
|
||||
}
|
||||
|
||||
fn search_queries_for_channel(provider_id: &str, items: &[ApiVideoItem]) -> Vec<String> {
|
||||
let mut candidates = Vec::new();
|
||||
match provider_id {
|
||||
"yesporn" => candidates.push("anal".to_string()),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
for item in items {
|
||||
for token in item.title.split_whitespace() {
|
||||
let cleaned = token
|
||||
.chars()
|
||||
.filter(|ch| ch.is_alphanumeric())
|
||||
.collect::<String>();
|
||||
if cleaned.len() >= 3
|
||||
&& !candidates
|
||||
.iter()
|
||||
.any(|existing| existing.eq_ignore_ascii_case(&cleaned))
|
||||
{
|
||||
candidates.push(cleaned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if candidates.is_empty() {
|
||||
candidates.push("video".to_string());
|
||||
}
|
||||
candidates
|
||||
}
|
||||
|
||||
fn skip_reason_for_provider(provider_id: &str) -> Option<&'static str> {
|
||||
if std::env::var("FLARE_URL").is_ok() {
|
||||
return None;
|
||||
@@ -893,6 +1035,13 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn provider_filter_matches(provider_id: &str) -> bool {
|
||||
match std::env::var("HOTTUB_TEST_PROVIDER") {
|
||||
Ok(filter) => filter.trim().is_empty() || filter.trim() == provider_id,
|
||||
Err(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn media_target(item: &ApiVideoItem) -> (String, Vec<(String, String)>) {
|
||||
if let Some(format) = item.formats.as_ref().and_then(|formats| formats.first()) {
|
||||
let headers = format
|
||||
@@ -940,7 +1089,12 @@ mod tests {
|
||||
let response = requester
|
||||
.get_raw_with_headers_timeout(url, headers, Some(VALIDATION_MEDIA_TIMEOUT))
|
||||
.await
|
||||
.map_err(|err| format!("{provider_id} item {} request failed for {url}: {err}", item_index + 1))?;
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"{provider_id} item {} request failed for {url}: {err}",
|
||||
item_index + 1
|
||||
)
|
||||
})?;
|
||||
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
@@ -956,10 +1110,12 @@ mod tests {
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let body = response
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|err| format!("{provider_id} item {} body read failed for {url}: {err}", item_index + 1))?;
|
||||
let body = response.bytes().await.map_err(|err| {
|
||||
format!(
|
||||
"{provider_id} item {} body read failed for {url}: {err}",
|
||||
item_index + 1
|
||||
)
|
||||
})?;
|
||||
|
||||
if body.is_empty() {
|
||||
return Err(format!(
|
||||
@@ -997,6 +1153,61 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validation_failure_streak_requires_hourly_spacing() {
|
||||
let provider_id = "hsex";
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
|
||||
let now = Instant::now();
|
||||
assert_eq!(record_validation_failure(provider_id, now), 1);
|
||||
assert_eq!(record_validation_failure(provider_id, now), 1);
|
||||
assert_eq!(
|
||||
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN),
|
||||
2
|
||||
);
|
||||
assert_eq!(
|
||||
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * 2),
|
||||
3
|
||||
);
|
||||
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validation_failure_streak_resets_after_success() {
|
||||
let provider_id = "hsex";
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
|
||||
let now = Instant::now();
|
||||
assert_eq!(record_validation_failure(provider_id, now), 1);
|
||||
assert_eq!(
|
||||
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN),
|
||||
2
|
||||
);
|
||||
reset_validation_failure_state(provider_id);
|
||||
assert_eq!(
|
||||
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * 2),
|
||||
1
|
||||
);
|
||||
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validation_failure_threshold_matches_channel_error_policy() {
|
||||
let provider_id = "hsex";
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
|
||||
let now = Instant::now();
|
||||
let mut counted = 0;
|
||||
for step in 0..VALIDATION_FAILURES_FOR_ERROR {
|
||||
counted = record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * step as u32);
|
||||
}
|
||||
assert_eq!(counted, VALIDATION_FAILURES_FOR_ERROR);
|
||||
|
||||
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn builds_group_index() {
|
||||
PROVIDER_RUNTIME_STATUS.remove("all");
|
||||
@@ -1114,6 +1325,7 @@ mod tests {
|
||||
let mut channels = ALL_PROVIDERS
|
||||
.iter()
|
||||
.filter(|(provider_id, _)| **provider_id != "all")
|
||||
.filter(|(provider_id, _)| provider_filter_matches(provider_id))
|
||||
.filter_map(|(_, provider)| provider.get_channel(client_version.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
channels.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
@@ -1199,4 +1411,163 @@ mod tests {
|
||||
eprintln!("skipped providers:\n{}", skipped.join("\n"));
|
||||
}
|
||||
}
|
||||
|
||||
#[ntex::test]
|
||||
#[ignore = "live search sweep across all providers"]
|
||||
async fn api_videos_search_returns_working_media_urls_for_all_channels() {
|
||||
let app = test::init_service(
|
||||
web::App::new()
|
||||
.state(test_db_pool())
|
||||
.state(VideoCache::new().max_size(10_000).to_owned())
|
||||
.state(Requester::new())
|
||||
.service(web::scope("/api").configure(crate::api::config)),
|
||||
)
|
||||
.await;
|
||||
|
||||
let client_version = ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string());
|
||||
let mut channels = ALL_PROVIDERS
|
||||
.iter()
|
||||
.filter(|(provider_id, _)| **provider_id != "all")
|
||||
.filter(|(provider_id, _)| provider_filter_matches(provider_id))
|
||||
.filter_map(|(_, provider)| provider.get_channel(client_version.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
channels.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
|
||||
let mut failures = Vec::new();
|
||||
let mut skipped = Vec::new();
|
||||
|
||||
for channel in channels {
|
||||
let provider_id = channel.id.clone();
|
||||
|
||||
if let Some(reason) = skip_reason_for_provider(&provider_id) {
|
||||
skipped.push(format!("{provider_id}: {reason}"));
|
||||
continue;
|
||||
}
|
||||
|
||||
let baseline_payload = request_for_channel(&channel);
|
||||
let baseline_request = test::TestRequest::post()
|
||||
.uri("/api/videos")
|
||||
.header(
|
||||
header::USER_AGENT,
|
||||
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
|
||||
)
|
||||
.set_json(&baseline_payload)
|
||||
.to_request();
|
||||
|
||||
let baseline_response = test::call_service(&app, baseline_request).await;
|
||||
let baseline_status = baseline_response.status();
|
||||
let baseline_body = test::read_body(baseline_response).await;
|
||||
if !baseline_status.is_success() {
|
||||
failures.push(format!(
|
||||
"{provider_id} baseline request returned status {baseline_status}: {}",
|
||||
String::from_utf8_lossy(&baseline_body)
|
||||
));
|
||||
continue;
|
||||
}
|
||||
let baseline: ApiVideosResponse = match serde_json::from_slice(&baseline_body) {
|
||||
Ok(payload) => payload,
|
||||
Err(error) => {
|
||||
failures.push(format!(
|
||||
"{provider_id} baseline returned invalid JSON: {error}; body={}",
|
||||
String::from_utf8_lossy(&baseline_body)
|
||||
));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if baseline.items.is_empty() {
|
||||
failures.push(format!(
|
||||
"{provider_id} baseline returned no items for search seed"
|
||||
));
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut selected_payload: Option<ApiVideosResponse> = None;
|
||||
let mut last_error: Option<String> = None;
|
||||
for search_query in search_queries_for_channel(&provider_id, &baseline.items)
|
||||
.into_iter()
|
||||
.take(12)
|
||||
{
|
||||
if search_query.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let payload = request_for_channel_with_query(&channel, search_query.clone());
|
||||
let request = test::TestRequest::post()
|
||||
.uri("/api/videos")
|
||||
.header(
|
||||
header::USER_AGENT,
|
||||
"Hot%20Tub/22c CFNetwork/1494.0.7 Darwin/23.4.0",
|
||||
)
|
||||
.set_json(&payload)
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&app, request).await;
|
||||
let status = response.status();
|
||||
let body = test::read_body(response).await;
|
||||
|
||||
if !status.is_success() {
|
||||
last_error = Some(format!(
|
||||
"{provider_id} search query={search_query} returned status {status}: {}",
|
||||
String::from_utf8_lossy(&body)
|
||||
));
|
||||
continue;
|
||||
}
|
||||
|
||||
let payload: ApiVideosResponse = match serde_json::from_slice(&body) {
|
||||
Ok(payload) => payload,
|
||||
Err(error) => {
|
||||
last_error = Some(format!(
|
||||
"{provider_id} search query={search_query} returned invalid JSON: {error}; body={}",
|
||||
String::from_utf8_lossy(&body)
|
||||
));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if payload.items.len() >= 5 {
|
||||
selected_payload = Some(payload);
|
||||
break;
|
||||
}
|
||||
last_error = Some(format!(
|
||||
"{provider_id} search query={search_query} returned fewer than 5 items: {}",
|
||||
payload.items.len()
|
||||
));
|
||||
}
|
||||
|
||||
let Some(payload) = selected_payload else {
|
||||
failures.push(last_error.unwrap_or_else(|| {
|
||||
format!("{provider_id} search did not yield at least 5 items")
|
||||
}));
|
||||
continue;
|
||||
};
|
||||
|
||||
for (item_index, item) in payload.items.iter().take(5).enumerate() {
|
||||
let (url, headers) = media_target(item);
|
||||
if url.is_empty() {
|
||||
failures.push(format!(
|
||||
"{provider_id} search item {} returned an empty media url",
|
||||
item_index + 1
|
||||
));
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(error) =
|
||||
assert_media_response(&provider_id, item_index, &url, headers).await
|
||||
{
|
||||
failures.push(error);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
failures.is_empty(),
|
||||
"provider live search sweep failed:\n{}",
|
||||
failures.join("\n")
|
||||
);
|
||||
|
||||
if !skipped.is_empty() {
|
||||
eprintln!("skipped providers:\n{}", skipped.join("\n"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user