""" app/media.py ============ File-system scanning and FFprobe metadata helpers. Public API ---------- VIDEO_EXTENSIONS : frozenset of lowercase video file suffixes get_video_info() : run ffprobe on a single file, return a metadata dict list_video_files(): walk a directory tree and return files above a size floor """ import json import os import subprocess from pathlib import Path # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- VIDEO_EXTENSIONS: frozenset[str] = frozenset({ '.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg', '.ts', '.mts', '.m2ts', '.vob', '.ogv', '.3gp', '.3g2', }) # --------------------------------------------------------------------------- # FFprobe helper # --------------------------------------------------------------------------- def get_video_info(filepath: str) -> dict | None: """ Use ffprobe to get duration, total bitrate, codec, and dimensions. Returns a dict with the keys below, or None if ffprobe fails. Bitrate resolution order (handles HEVC/MKV where the stream-level bit_rate field is absent): 1. Stream-level bit_rate — present for H.264/MP4, often missing for HEVC 2. Format-level bit_rate — reliable for all containers 3. Derived from size / duration — final fallback Returned keys ------------- duration, bit_rate_bps, bit_rate_mbps, target_bit_rate_bps, target_bit_rate_mbps, size_bytes, size_gb, codec, width, height """ cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'format=duration,bit_rate,size:stream=codec_name,width,height,bit_rate', '-of', 'json', filepath, ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return None data = json.loads(result.stdout) fmt = data.get('format', {}) stream = (data.get('streams') or [{}])[0] duration = float(fmt.get('duration', 0)) size_bytes = int(fmt.get('size', 0)) codec = stream.get('codec_name', 'unknown') width = stream.get('width', 0) height = stream.get('height', 0) stream_br = int(stream.get('bit_rate') or 0) format_br = int(fmt.get('bit_rate') or 0) if stream_br > 0: bit_rate = stream_br elif format_br > 0: bit_rate = format_br elif duration > 0: bit_rate = int((size_bytes * 8) / duration) else: bit_rate = 0 # Target ≈ 1/3 of the total bitrate; reserve 128 kbps for audio. audio_bps = 128_000 video_bps = bit_rate - audio_bps if bit_rate > audio_bps else bit_rate target_video_bps = max(int(video_bps / 3), 200_000) return { 'duration': duration, 'bit_rate_bps': bit_rate, 'bit_rate_mbps': round(bit_rate / 1_000_000, 2), 'target_bit_rate_bps': target_video_bps, 'target_bit_rate_mbps': round(target_video_bps / 1_000_000, 2), 'size_bytes': size_bytes, 'size_gb': round(size_bytes / (1024 ** 3), 3), 'codec': codec, 'width': width, 'height': height, } except Exception: return None # --------------------------------------------------------------------------- # Directory scanner # --------------------------------------------------------------------------- def list_video_files(directory: Path, min_size_gb: float) -> list[dict]: """ Recursively walk *directory* and return video files larger than *min_size_gb* gigabytes. Each entry is a dict with: path, name, size_bytes, size_gb. Raises PermissionError if the root directory is inaccessible. """ min_bytes = min_size_gb * (1024 ** 3) results: list[dict] = [] try: for root, dirs, files in os.walk(directory): dirs[:] = [d for d in dirs if not d.startswith('.')] for fname in files: if Path(fname).suffix.lower() in VIDEO_EXTENSIONS: fpath = os.path.join(root, fname) try: fsize = os.path.getsize(fpath) if fsize >= min_bytes: results.append({ 'path': fpath, 'name': fname, 'size_bytes': fsize, 'size_gb': round(fsize / (1024 ** 3), 3), }) except OSError: continue except PermissionError as exc: raise PermissionError(f"Cannot access directory: {exc}") from exc return results