detect m3u8 is actually mp4

This commit is contained in:
Simon
2026-02-09 18:35:39 +00:00
parent 5baca567cb
commit c2872c1883

View File

@@ -211,6 +211,17 @@ def stream_video():
path = urllib.parse.urlparse(url).path.lower()
return any(path.endswith(ext) for ext in ('.mp4', '.m4v', '.m4s', '.ts', '.webm', '.mov'))
def looks_like_m3u8_bytes(chunk):
if not chunk:
return False
sample = chunk.lstrip(b'\xef\xbb\xbf')
return b'#EXTM3U' in sample[:1024]
def looks_like_mp4_bytes(chunk):
if not chunk or len(chunk) < 8:
return False
return chunk[4:8] == b'ftyp'
def build_upstream_headers(referer):
headers = {
'User-Agent': request.headers.get('User-Agent'),
@@ -236,6 +247,36 @@ def stream_video():
# Remove keys with None values
return {k: v for k, v in headers.items() if v}
def build_forwarded_headers(resp, target_url=None, content_type_override=None):
hop_by_hop = {
'connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization',
'te', 'trailers', 'transfer-encoding', 'upgrade'
}
forwarded_headers = []
response_content_type = None
for name, value in resp.headers.items():
if name.lower() in hop_by_hop:
continue
if name.lower() == 'content-length':
forwarded_headers.append((name, value))
continue
if name.lower() == 'content-type':
response_content_type = value
if name.lower() == 'content-type' and content_type_override:
continue
forwarded_headers.append((name, value))
if not content_type_override:
if not response_content_type or 'application/octet-stream' in response_content_type:
content_type_override = guess_content_type(target_url or resp.url)
if content_type_override:
forwarded_headers.append(('Content-Type', content_type_override))
dbg(f"content_type_override={content_type_override}")
return forwarded_headers
def proxy_response(target_url, content_type_override=None, referer_override=None):
# Extract the base domain to spoof the referer
request_referer = request.args.get('referer')
@@ -267,13 +308,7 @@ def stream_video():
except StopIteration:
first_chunk = b""
def looks_like_m3u8(chunk):
if not chunk:
return False
sample = chunk.lstrip(b'\xef\xbb\xbf')
return b'#EXTM3U' in sample[:1024]
if looks_like_m3u8(first_chunk):
if looks_like_m3u8_bytes(first_chunk):
remaining = b"".join(chunk for chunk in content_iter if chunk)
body_bytes = first_chunk + remaining
base_url = resp.url
@@ -288,32 +323,11 @@ def stream_video():
prefetched_encoding=encoding,
)
hop_by_hop = {
'connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization',
'te', 'trailers', 'transfer-encoding', 'upgrade'
}
forwarded_headers = []
response_content_type = None
for name, value in resp.headers.items():
if name.lower() in hop_by_hop:
continue
if name.lower() == 'content-length':
forwarded_headers.append((name, value))
continue
if name.lower() == 'content-type':
response_content_type = value
if name.lower() == 'content-type' and content_type_override:
continue
forwarded_headers.append((name, value))
if not content_type_override:
if not response_content_type or 'application/octet-stream' in response_content_type:
content_type_override = guess_content_type(target_url)
if content_type_override:
forwarded_headers.append(('Content-Type', content_type_override))
dbg(f"content_type_override={content_type_override}")
forwarded_headers = build_forwarded_headers(
resp,
target_url=target_url,
content_type_override=content_type_override,
)
if request.method == 'HEAD':
resp.close()
@@ -374,10 +388,47 @@ def stream_video():
headers['User-Agent'] = 'Mozilla/5.0'
if 'Accept' not in headers:
headers['Accept'] = '*/*'
resp = session.get(playlist_url, headers=headers, timeout=30)
resp = session.get(playlist_url, headers=headers, stream=True, timeout=30)
resp.raise_for_status()
base_url = resp.url
body_text = resp.text
if request.method == 'HEAD':
forwarded_headers = build_forwarded_headers(resp, target_url=base_url)
resp.close()
return Response("", status=resp.status_code, headers=forwarded_headers)
content_iter = resp.iter_content(chunk_size=1024 * 16)
try:
first_chunk = next(content_iter)
except StopIteration:
first_chunk = b""
if looks_like_m3u8_bytes(first_chunk):
remaining = b"".join(chunk for chunk in content_iter if chunk)
body_bytes = first_chunk + remaining
body_text = decode_playlist_body(body_bytes, resp.encoding)
resp.close()
else:
content_type_override = None
if looks_like_mp4_bytes(first_chunk):
content_type_override = 'video/mp4'
forwarded_headers = build_forwarded_headers(
resp,
target_url=base_url,
content_type_override=content_type_override,
)
def generate():
try:
if first_chunk:
yield first_chunk
for chunk in content_iter:
if chunk:
yield chunk
finally:
resp.close()
return Response(generate(), status=resp.status_code, headers=forwarded_headers)
else:
body_text = decode_playlist_body(prefetched_body, prefetched_encoding)