diff --git a/src/providers/mod.rs b/src/providers/mod.rs index da7a09c..8c9ccc2 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -36,8 +36,10 @@ pub static ALL_PROVIDERS: Lazy> = Lazy::new(| const CHANNEL_STATUS_ERROR: &str = "error"; const VALIDATION_RESULTS_REQUIRED: usize = 5; -const VALIDATION_COOLDOWN: Duration = Duration::from_secs(60); +const VALIDATION_MIN_SUCCESS: usize = 3; +const VALIDATION_COOLDOWN: Duration = Duration::from_secs(3600); const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100); +const VALIDATION_ERROR_RETEST_INTERVAL: Duration = Duration::from_secs(5 * 60); #[derive(Clone)] struct ProviderValidationContext { @@ -50,6 +52,7 @@ static PROVIDER_VALIDATION_CONTEXT: OnceLock = OnceLo static PROVIDER_RUNTIME_STATUS: Lazy> = Lazy::new(DashMap::new); static PROVIDER_VALIDATION_INFLIGHT: Lazy> = Lazy::new(DashSet::new); static PROVIDER_VALIDATION_LAST_RUN: Lazy> = Lazy::new(DashMap::new); +static PROVIDER_ERROR_REVALIDATION_STARTED: OnceLock<()> = OnceLock::new(); fn validation_client_version() -> ClientVersion { ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string()) @@ -252,26 +255,72 @@ async fn run_provider_validation(provider_id: &str) -> Result<(), String> { )); } + let mut successes = 0usize; + let mut failures = Vec::new(); for (item_index, item) in items.iter().take(VALIDATION_RESULTS_REQUIRED).enumerate() { let (url, headers) = media_target(item); if url.is_empty() { - return Err(format!( + failures.push(format!( "{provider_id} item {} returned an empty media url", item_index + 1 )); + continue; } - validate_media_response( + match validate_media_response( provider_id, item_index, &url, headers, context.requester.clone(), ) - .await?; + .await + { + Ok(()) => { + successes += 1; + if successes >= VALIDATION_MIN_SUCCESS { + return Ok(()); + } + } + Err(error) => failures.push(error), + } } - Ok(()) + Err(format!( + "{provider_id} validation failed: only {successes} media checks passed (required at least {VALIDATION_MIN_SUCCESS}); failures={}", + failures.join(" | ") + )) +} + +fn start_periodic_error_revalidation() { + if PROVIDER_ERROR_REVALIDATION_STARTED.set(()).is_err() { + return; + } + + tokio::spawn(async move { + let mut interval = tokio::time::interval(VALIDATION_ERROR_RETEST_INTERVAL); + loop { + interval.tick().await; + let errored_providers = PROVIDER_RUNTIME_STATUS + .iter() + .filter_map(|entry| { + if entry.value().as_str() == CHANNEL_STATUS_ERROR { + Some(entry.key().clone()) + } else { + None + } + }) + .collect::>(); + + for provider_id in errored_providers { + schedule_provider_validation( + &provider_id, + "periodic_retest", + "provider currently marked as error", + ); + } + } + }); } pub fn configure_runtime_validation( @@ -285,7 +334,9 @@ pub fn configure_runtime_validation( cache, requester, }) - .map_err(|_| "provider validation context already configured") + .map_err(|_| "provider validation context already configured")?; + start_periodic_error_revalidation(); + Ok(()) } pub fn current_provider_channel_status(provider_id: &str) -> Option {