This commit is contained in:
Simon
2026-04-09 07:19:33 +00:00
parent 6e43b3b3d0
commit 57eb2d7063
6 changed files with 49 additions and 352 deletions

View File

@@ -37,13 +37,10 @@ pub static ALL_PROVIDERS: Lazy<HashMap<&'static str, DynProvider>> = Lazy::new(|
m
});
const CHANNEL_STATUS_ERROR: &str = "error";
const VALIDATION_RESULTS_REQUIRED: usize = 5;
const VALIDATION_MIN_SUCCESS: usize = 1;
const VALIDATION_COOLDOWN: Duration = Duration::from_secs(3600);
const VALIDATION_MEDIA_TIMEOUT: Duration = Duration::from_secs(100);
const VALIDATION_ERROR_RETEST_INTERVAL: Duration = VALIDATION_COOLDOWN;
const VALIDATION_FAILURES_FOR_ERROR: u8 = 5;
#[derive(Clone)]
struct ProviderValidationContext {
@@ -64,7 +61,6 @@ static PROVIDER_VALIDATION_INFLIGHT: Lazy<DashSet<String>> = Lazy::new(DashSet::
static PROVIDER_VALIDATION_LAST_RUN: Lazy<DashMap<String, Instant>> = Lazy::new(DashMap::new);
static PROVIDER_VALIDATION_FAILURE_STATE: Lazy<DashMap<String, ValidationFailureState>> =
Lazy::new(DashMap::new);
static PROVIDER_ERROR_REVALIDATION_STARTED: OnceLock<()> = OnceLock::new();
fn validation_client_version() -> ClientVersion {
ClientVersion::new(22, 'c' as u32, "Hot%20Tub".to_string())
@@ -394,37 +390,6 @@ fn record_validation_failure(provider_id: &str, now: Instant) -> u8 {
1
}
fn start_periodic_error_revalidation() {
if PROVIDER_ERROR_REVALIDATION_STARTED.set(()).is_err() {
return;
}
tokio::spawn(async move {
let mut interval = tokio::time::interval(VALIDATION_ERROR_RETEST_INTERVAL);
loop {
interval.tick().await;
let errored_providers = PROVIDER_RUNTIME_STATUS
.iter()
.filter_map(|entry| {
if entry.value().as_str() == CHANNEL_STATUS_ERROR {
Some(entry.key().clone())
} else {
None
}
})
.collect::<Vec<_>>();
for provider_id in errored_providers {
schedule_provider_validation(
&provider_id,
"periodic_retest",
"provider currently marked as error",
);
}
}
});
}
pub fn configure_runtime_validation(
pool: DbPool,
cache: VideoCache,
@@ -436,9 +401,7 @@ pub fn configure_runtime_validation(
cache,
requester,
})
.map_err(|_| "provider validation context already configured")?;
start_periodic_error_revalidation();
Ok(())
.map_err(|_| "provider validation context already configured")
}
pub fn current_provider_channel_status(provider_id: &str) -> Option<String> {
@@ -486,19 +449,13 @@ pub fn schedule_provider_validation(provider_id: &str, context: &str, msg: &str)
match validation_result {
Ok(()) => {
reset_validation_failure_state(&provider_id);
PROVIDER_RUNTIME_STATUS.remove(&provider_id);
}
Err(_validation_error) => {
let failures = record_validation_failure(&provider_id, Instant::now());
if failures >= VALIDATION_FAILURES_FOR_ERROR {
PROVIDER_RUNTIME_STATUS
.insert(provider_id.clone(), CHANNEL_STATUS_ERROR.to_string());
}
let _failure_count = record_validation_failure(&provider_id, Instant::now());
crate::flow_debug!(
"provider validation failed provider={} failures={} threshold={} error={}",
"provider validation failed provider={} failures={} error={}",
&provider_id,
failures,
VALIDATION_FAILURES_FOR_ERROR,
_failure_count,
crate::util::flow_debug::preview(&_validation_error, 160)
);
}
@@ -907,6 +864,7 @@ pub fn build_status_response(status: Status) -> StatusResponse {
nsfw: status.nsfw,
categories: status.categories,
options: status.options,
cdnReferrers: status.cdnReferrers,
filtersFooter: status.filtersFooter,
}
}
@@ -1354,22 +1312,6 @@ mod tests {
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
#[test]
fn validation_failure_threshold_matches_channel_error_policy() {
let provider_id = "hsex";
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
let now = Instant::now();
let mut counted = 0;
for step in 0..VALIDATION_FAILURES_FOR_ERROR {
counted =
record_validation_failure(provider_id, now + VALIDATION_COOLDOWN * step as u32);
}
assert_eq!(counted, VALIDATION_FAILURES_FOR_ERROR);
PROVIDER_VALIDATION_FAILURE_STATE.remove(provider_id);
}
#[test]
fn builds_group_index() {
PROVIDER_RUNTIME_STATUS.remove("all");
@@ -1453,6 +1395,12 @@ mod tests {
.expect("chinese group present");
assert_eq!(chinese_group["systemImage"], "globe");
let cdn_referrers = json["cdnReferrers"].as_array().expect("cdnReferrers array");
assert_eq!(cdn_referrers.len(), 1);
assert_eq!(cdn_referrers[0]["hostContains"], "phncdn");
assert_eq!(cdn_referrers[0]["referer"], "https://www.pornhub.com/");
assert_eq!(cdn_referrers[0]["origin"], "https://www.pornhub.com");
let pimpbunny_channel = channels
.iter()
.find(|channel| channel["id"] == "pimpbunny")
@@ -1463,14 +1411,6 @@ mod tests {
);
}
#[test]
fn runtime_error_status_overrides_channel_status() {
PROVIDER_RUNTIME_STATUS.insert("hsex".to_string(), CHANNEL_STATUS_ERROR.to_string());
let channel = decorate_channel(base_channel("hsex"));
assert_eq!(channel.status, CHANNEL_STATUS_ERROR);
PROVIDER_RUNTIME_STATUS.remove("hsex");
}
#[ntex::test]
#[ignore = "live network sweep across all providers"]
async fn api_videos_returns_working_media_urls_for_all_channels() {