140 lines
4.9 KiB
Python
140 lines
4.9 KiB
Python
"""
|
|
app/media.py
|
|
============
|
|
File-system scanning and FFprobe metadata helpers.
|
|
|
|
Public API
|
|
----------
|
|
VIDEO_EXTENSIONS : frozenset of lowercase video file suffixes
|
|
get_video_info() : run ffprobe on a single file, return a metadata dict
|
|
list_video_files(): walk a directory tree and return files above a size floor
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VIDEO_EXTENSIONS: frozenset[str] = frozenset({
|
|
'.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv',
|
|
'.webm', '.m4v', '.mpg', '.mpeg', '.ts', '.mts',
|
|
'.m2ts', '.vob', '.ogv', '.3gp', '.3g2',
|
|
})
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FFprobe helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_video_info(filepath: str) -> dict | None:
|
|
"""
|
|
Use ffprobe to get duration, total bitrate, codec, and dimensions.
|
|
|
|
Returns a dict with the keys below, or None if ffprobe fails.
|
|
|
|
Bitrate resolution order (handles HEVC/MKV where the stream-level
|
|
bit_rate field is absent):
|
|
1. Stream-level bit_rate — present for H.264/MP4, often missing for HEVC
|
|
2. Format-level bit_rate — reliable for all containers
|
|
3. Derived from size / duration — final fallback
|
|
|
|
Returned keys
|
|
-------------
|
|
duration, bit_rate_bps, bit_rate_mbps,
|
|
target_bit_rate_bps, target_bit_rate_mbps,
|
|
size_bytes, size_gb, codec, width, height
|
|
"""
|
|
cmd = [
|
|
'ffprobe', '-v', 'error',
|
|
'-select_streams', 'v:0',
|
|
'-show_entries',
|
|
'format=duration,bit_rate,size:stream=codec_name,width,height,bit_rate',
|
|
'-of', 'json',
|
|
filepath,
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
if result.returncode != 0:
|
|
return None
|
|
|
|
data = json.loads(result.stdout)
|
|
fmt = data.get('format', {})
|
|
stream = (data.get('streams') or [{}])[0]
|
|
|
|
duration = float(fmt.get('duration', 0))
|
|
size_bytes = int(fmt.get('size', 0))
|
|
codec = stream.get('codec_name', 'unknown')
|
|
width = stream.get('width', 0)
|
|
height = stream.get('height', 0)
|
|
|
|
stream_br = int(stream.get('bit_rate') or 0)
|
|
format_br = int(fmt.get('bit_rate') or 0)
|
|
if stream_br > 0:
|
|
bit_rate = stream_br
|
|
elif format_br > 0:
|
|
bit_rate = format_br
|
|
elif duration > 0:
|
|
bit_rate = int((size_bytes * 8) / duration)
|
|
else:
|
|
bit_rate = 0
|
|
|
|
# Target ≈ 1/3 of the total bitrate; reserve 128 kbps for audio.
|
|
audio_bps = 128_000
|
|
video_bps = bit_rate - audio_bps if bit_rate > audio_bps else bit_rate
|
|
target_video_bps = max(int(video_bps / 3), 200_000)
|
|
|
|
return {
|
|
'duration': duration,
|
|
'bit_rate_bps': bit_rate,
|
|
'bit_rate_mbps': round(bit_rate / 1_000_000, 2),
|
|
'target_bit_rate_bps': target_video_bps,
|
|
'target_bit_rate_mbps': round(target_video_bps / 1_000_000, 2),
|
|
'size_bytes': size_bytes,
|
|
'size_gb': round(size_bytes / (1024 ** 3), 3),
|
|
'codec': codec,
|
|
'width': width,
|
|
'height': height,
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Directory scanner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def list_video_files(directory: Path, min_size_gb: float) -> list[dict]:
|
|
"""
|
|
Recursively walk *directory* and return video files larger than
|
|
*min_size_gb* gigabytes.
|
|
|
|
Each entry is a dict with: path, name, size_bytes, size_gb.
|
|
Raises PermissionError if the root directory is inaccessible.
|
|
"""
|
|
min_bytes = min_size_gb * (1024 ** 3)
|
|
results: list[dict] = []
|
|
|
|
try:
|
|
for root, dirs, files in os.walk(directory):
|
|
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
|
for fname in files:
|
|
if Path(fname).suffix.lower() in VIDEO_EXTENSIONS:
|
|
fpath = os.path.join(root, fname)
|
|
try:
|
|
fsize = os.path.getsize(fpath)
|
|
if fsize >= min_bytes:
|
|
results.append({
|
|
'path': fpath,
|
|
'name': fname,
|
|
'size_bytes': fsize,
|
|
'size_gb': round(fsize / (1024 ** 3), 3),
|
|
})
|
|
except OSError:
|
|
continue
|
|
except PermissionError as exc:
|
|
raise PermissionError(f"Cannot access directory: {exc}") from exc
|
|
|
|
return results
|