video_press/app/media.py

140 lines
4.9 KiB
Python

"""
app/media.py
============
File-system scanning and FFprobe metadata helpers.
Public API
----------
VIDEO_EXTENSIONS : frozenset of lowercase video file suffixes
get_video_info() : run ffprobe on a single file, return a metadata dict
list_video_files(): walk a directory tree and return files above a size floor
"""
import json
import os
import subprocess
from pathlib import Path
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
VIDEO_EXTENSIONS: frozenset[str] = frozenset({
'.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv',
'.webm', '.m4v', '.mpg', '.mpeg', '.ts', '.mts',
'.m2ts', '.vob', '.ogv', '.3gp', '.3g2',
})
# ---------------------------------------------------------------------------
# FFprobe helper
# ---------------------------------------------------------------------------
def get_video_info(filepath: str) -> dict | None:
"""
Use ffprobe to get duration, total bitrate, codec, and dimensions.
Returns a dict with the keys below, or None if ffprobe fails.
Bitrate resolution order (handles HEVC/MKV where the stream-level
bit_rate field is absent):
1. Stream-level bit_rate — present for H.264/MP4, often missing for HEVC
2. Format-level bit_rate — reliable for all containers
3. Derived from size / duration — final fallback
Returned keys
-------------
duration, bit_rate_bps, bit_rate_mbps,
target_bit_rate_bps, target_bit_rate_mbps,
size_bytes, size_gb, codec, width, height
"""
cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries',
'format=duration,bit_rate,size:stream=codec_name,width,height,bit_rate',
'-of', 'json',
filepath,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
fmt = data.get('format', {})
stream = (data.get('streams') or [{}])[0]
duration = float(fmt.get('duration', 0))
size_bytes = int(fmt.get('size', 0))
codec = stream.get('codec_name', 'unknown')
width = stream.get('width', 0)
height = stream.get('height', 0)
stream_br = int(stream.get('bit_rate') or 0)
format_br = int(fmt.get('bit_rate') or 0)
if stream_br > 0:
bit_rate = stream_br
elif format_br > 0:
bit_rate = format_br
elif duration > 0:
bit_rate = int((size_bytes * 8) / duration)
else:
bit_rate = 0
# Target ≈ 1/3 of the total bitrate; reserve 128 kbps for audio.
audio_bps = 128_000
video_bps = bit_rate - audio_bps if bit_rate > audio_bps else bit_rate
target_video_bps = max(int(video_bps / 3), 200_000)
return {
'duration': duration,
'bit_rate_bps': bit_rate,
'bit_rate_mbps': round(bit_rate / 1_000_000, 2),
'target_bit_rate_bps': target_video_bps,
'target_bit_rate_mbps': round(target_video_bps / 1_000_000, 2),
'size_bytes': size_bytes,
'size_gb': round(size_bytes / (1024 ** 3), 3),
'codec': codec,
'width': width,
'height': height,
}
except Exception:
return None
# ---------------------------------------------------------------------------
# Directory scanner
# ---------------------------------------------------------------------------
def list_video_files(directory: Path, min_size_gb: float) -> list[dict]:
"""
Recursively walk *directory* and return video files larger than
*min_size_gb* gigabytes.
Each entry is a dict with: path, name, size_bytes, size_gb.
Raises PermissionError if the root directory is inaccessible.
"""
min_bytes = min_size_gb * (1024 ** 3)
results: list[dict] = []
try:
for root, dirs, files in os.walk(directory):
dirs[:] = [d for d in dirs if not d.startswith('.')]
for fname in files:
if Path(fname).suffix.lower() in VIDEO_EXTENSIONS:
fpath = os.path.join(root, fname)
try:
fsize = os.path.getsize(fpath)
if fsize >= min_bytes:
results.append({
'path': fpath,
'name': fname,
'size_bytes': fsize,
'size_gb': round(fsize / (1024 ** 3), 3),
})
except OSError:
continue
except PermissionError as exc:
raise PermissionError(f"Cannot access directory: {exc}") from exc
return results