#!/usr/bin/env python3 """ Video Compressor Web Application — WSGI/Docker edition See wsgi.py for the Gunicorn entry point. """ import os import json import subprocess import threading import time from pathlib import Path from flask import Flask, request, jsonify, Response, render_template, stream_with_context # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- BASE_DIR = Path(__file__).resolve().parent MEDIA_ROOT = Path(os.environ.get('MEDIA_ROOT', '/media')).resolve() app = Flask( __name__, template_folder=str(BASE_DIR / 'templates'), static_folder=str(BASE_DIR / 'static'), ) active_jobs: dict = {} job_lock = threading.Lock() # --------------------------------------------------------------------------- # Path-jail helper — every client-supplied path passes through this # --------------------------------------------------------------------------- def safe_path(raw: str) -> Path: """ Resolve *raw* and assert it is inside MEDIA_ROOT. Raises PermissionError on any attempt to escape the root. """ try: resolved = Path(raw).resolve() except Exception: raise PermissionError(f"Invalid path: {raw!r}") root_str = str(MEDIA_ROOT) path_str = str(resolved) if path_str != root_str and not path_str.startswith(root_str + os.sep): raise PermissionError( f"Access denied: path is outside the allowed media root ({MEDIA_ROOT})." ) return resolved # --------------------------------------------------------------------------- # FFprobe / filesystem helpers # --------------------------------------------------------------------------- VIDEO_EXTENSIONS = { '.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg', '.ts', '.mts', '.m2ts', '.vob', '.ogv', '.3gp', '.3g2', } def get_video_info(filepath: str) -> dict | None: """ Use ffprobe to get duration, total bitrate, codec, and dimensions. Bitrate resolution strategy (handles HEVC/MKV where stream-level bit_rate is absent): 1. Stream-level bit_rate — present for H.264/MP4, often missing for HEVC 2. Format-level bit_rate — reliable for all containers 3. Derived from size/duration — final fallback """ cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'format=duration,bit_rate,size:stream=codec_name,width,height,bit_rate', '-of', 'json', filepath, ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return None data = json.loads(result.stdout) fmt = data.get('format', {}) stream = (data.get('streams') or [{}])[0] duration = float(fmt.get('duration', 0)) size_bytes = int(fmt.get('size', 0)) codec = stream.get('codec_name', 'unknown') width = stream.get('width', 0) height = stream.get('height', 0) # Prefer stream-level bitrate, fall back to format-level, then derive stream_br = int(stream.get('bit_rate') or 0) format_br = int(fmt.get('bit_rate') or 0) if stream_br > 0: bit_rate = stream_br elif format_br > 0: bit_rate = format_br elif duration > 0: bit_rate = int((size_bytes * 8) / duration) else: bit_rate = 0 # Target ≈ 1/3 of the total bitrate, reserving 128 kbps for audio. # For HEVC sources the format bitrate already includes audio, so the # same formula applies regardless of codec. audio_bps = 128_000 video_bps = bit_rate - audio_bps if bit_rate > audio_bps else bit_rate target_video_bps = max(int(video_bps / 3), 200_000) return { 'duration': duration, 'bit_rate_bps': bit_rate, 'bit_rate_mbps': round(bit_rate / 1_000_000, 2), 'target_bit_rate_bps': target_video_bps, 'target_bit_rate_mbps': round(target_video_bps / 1_000_000, 2), 'size_bytes': size_bytes, 'size_gb': round(size_bytes / (1024 ** 3), 3), 'codec': codec, 'width': width, 'height': height, } except Exception: return None def list_video_files(directory: Path, min_size_gb: float) -> list: min_bytes = min_size_gb * (1024 ** 3) results = [] try: for root, dirs, files in os.walk(directory): dirs[:] = [d for d in dirs if not d.startswith('.')] for fname in files: if Path(fname).suffix.lower() in VIDEO_EXTENSIONS: fpath = os.path.join(root, fname) try: fsize = os.path.getsize(fpath) if fsize >= min_bytes: results.append({ 'path': fpath, 'name': fname, 'size_bytes': fsize, 'size_gb': round(fsize / (1024 ** 3), 3), }) except OSError: continue except PermissionError as exc: raise PermissionError(f"Cannot access directory: {exc}") from exc return results # --------------------------------------------------------------------------- # Routes — UI # --------------------------------------------------------------------------- @app.route('/') def index(): return render_template('index.html', media_root=str(MEDIA_ROOT)) # --------------------------------------------------------------------------- # Routes — API # --------------------------------------------------------------------------- @app.route('/api/config') def api_config(): """Expose server-side config the frontend needs (e.g. media root).""" return jsonify({'media_root': str(MEDIA_ROOT)}) @app.route('/api/browse') def browse_directory(): raw = request.args.get('path', str(MEDIA_ROOT)) try: path = safe_path(raw) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 if not path.exists(): return jsonify({'error': 'Path does not exist'}), 404 if not path.is_dir(): return jsonify({'error': 'Not a directory'}), 400 try: entries = [] for entry in sorted(path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): if entry.name.startswith('.'): continue entries.append({'name': entry.name, 'path': str(entry), 'is_dir': entry.is_dir()}) parent = str(path.parent) if path != MEDIA_ROOT else None return jsonify({ 'current': str(path), 'parent': parent, 'entries': entries, 'media_root': str(MEDIA_ROOT), }) except PermissionError: return jsonify({'error': 'Permission denied'}), 403 @app.route('/api/scan', methods=['POST']) def scan_directory(): data = request.get_json(silent=True) or {} raw_dir = data.get('directory', '') min_size_gb = float(data.get('min_size_gb', 1.0)) if not raw_dir: return jsonify({'error': 'No directory provided'}), 400 try: directory = safe_path(raw_dir) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 if not directory.is_dir(): return jsonify({'error': 'Invalid directory'}), 400 try: files = list_video_files(directory, min_size_gb) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 enriched = [] for f in files: info = get_video_info(f['path']) if info: f.update(info) else: f['bit_rate_mbps'] = round((f['size_bytes'] * 8) / (90 * 60 * 1_000_000), 2) f['target_bit_rate_mbps'] = round(f['bit_rate_mbps'] / 3, 2) f['bit_rate_bps'] = int(f['bit_rate_mbps'] * 1_000_000) f['target_bit_rate_bps'] = int(f['target_bit_rate_mbps'] * 1_000_000) f['duration'] = 0 f['codec'] = 'unknown' f['width'] = 0 f['height'] = 0 enriched.append(f) enriched.sort(key=lambda x: x['size_bytes'], reverse=True) return jsonify({'files': enriched, 'count': len(enriched)}) @app.route('/api/compress/start', methods=['POST']) def start_compression(): data = request.get_json(silent=True) or {} files = data.get('files', []) suffix = data.get('suffix', '_new') if not files: return jsonify({'error': 'No files provided'}), 400 for f in files: try: safe_path(f.get('path', '')) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 job_id = f"job_{int(time.time() * 1000)}" job = { 'id': job_id, 'files': files, 'suffix': suffix, 'status': 'running', 'current_index': 0, 'total': len(files), 'events': [], 'process': None, 'cancelled': False, 'lock': threading.Lock(), } with job_lock: active_jobs[job_id] = job threading.Thread(target=run_compression_job, args=(job_id,), daemon=True).start() return jsonify({'job_id': job_id}) @app.route('/api/compress/progress/') def compression_progress(job_id): """ SSE stream. Works under Gunicorn+gevent: time.sleep() yields the greenlet instead of blocking a real OS thread. """ def event_stream(): last_idx = 0 while True: with job_lock: job = active_jobs.get(job_id) if not job: yield f"data: {json.dumps({'type': 'error', 'message': 'Job not found'})}\n\n" return with job['lock']: new_events = job['events'][last_idx:] last_idx += len(new_events) status = job['status'] for event in new_events: yield f"data: {json.dumps(event)}\n\n" if status in ('done', 'cancelled', 'error') and not new_events: break time.sleep(0.25) return Response( stream_with_context(event_stream()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, ) @app.route('/api/compress/cancel/', methods=['POST']) def cancel_compression(job_id): with job_lock: job = active_jobs.get(job_id) if not job: return jsonify({'error': 'Job not found'}), 404 with job['lock']: job['cancelled'] = True proc = job.get('process') if proc and proc.poll() is None: try: proc.terminate() time.sleep(1) if proc.poll() is None: proc.kill() except Exception: pass return jsonify({'status': 'cancellation requested'}) # --------------------------------------------------------------------------- # Compression worker # --------------------------------------------------------------------------- def push_event(job: dict, event: dict) -> None: with job['lock']: job['events'].append(event) def run_compression_job(job_id: str) -> None: with job_lock: job = active_jobs.get(job_id) if not job: return files = job['files'] suffix = job['suffix'] total = job['total'] push_event(job, {'type': 'start', 'total': total, 'message': f'Starting compression of {total} file(s)'}) for idx, file_info in enumerate(files): with job['lock']: cancelled = job['cancelled'] if cancelled: push_event(job, {'type': 'cancelled', 'message': 'Compression cancelled by user'}) with job['lock']: job['status'] = 'cancelled' return src_path = file_info['path'] target_bitrate = file_info.get('target_bit_rate_bps', 1_000_000) src_codec = file_info.get('codec', 'unknown').lower() p = Path(src_path) out_path = str(p.parent / (p.stem + suffix + p.suffix)) # Choose encoder to match the source codec. # hevc / h265 / x265 → libx265 # everything else → libx264 (safe, universally supported) is_hevc = src_codec in ('hevc', 'h265', 'x265') encoder = 'libx265' if is_hevc else 'libx264' push_event(job, { 'type': 'file_start', 'index': idx, 'total': total, 'filename': p.name, 'output': out_path, 'encoder': encoder, 'message': f'Compressing ({idx + 1}/{total}): {p.name} [{encoder}]', }) try: probe = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', src_path], capture_output=True, text=True, timeout=30, ) duration_secs = float(probe.stdout.strip()) if probe.stdout.strip() else 0 except Exception: duration_secs = 0 video_k = max(int(target_bitrate / 1000), 200) # Build the encoder-specific part of the ffmpeg command. # # libx264 uses -maxrate / -bufsize for VBV (Video Buffering Verifier). # libx265 passes those same constraints via -x265-params because its # CLI option names differ from the generic ffmpeg flags. # Both use AAC audio at 128 kbps. # -movflags +faststart is only meaningful for MP4 containers; it is # harmless (silently ignored) for MKV/MOV/etc. if is_hevc: vbv_maxrate = int(video_k * 1.5) vbv_bufsize = video_k * 2 encoder_opts = [ '-c:v', 'libx265', '-b:v', f'{video_k}k', '-x265-params', f'vbv-maxrate={vbv_maxrate}:vbv-bufsize={vbv_bufsize}', ] else: encoder_opts = [ '-c:v', 'libx264', '-b:v', f'{video_k}k', '-maxrate', f'{int(video_k * 1.5)}k', '-bufsize', f'{video_k * 2}k', ] cmd = [ 'ffmpeg', '-y', '-i', src_path, *encoder_opts, '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', '-progress', 'pipe:1', '-nostats', out_path, ] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) with job['lock']: job['process'] = proc for line in proc.stdout: with job['lock']: cancelled = job['cancelled'] if cancelled: proc.terminate() break line = line.strip() if '=' not in line: continue key, _, value = line.partition('=') key, value = key.strip(), value.strip() if key == 'out_time_ms' and duration_secs > 0: try: elapsed = int(value) / 1_000_000 pct = min(100.0, (elapsed / duration_secs) * 100) push_event(job, { 'type': 'progress', 'index': idx, 'percent': round(pct, 1), 'elapsed_secs': round(elapsed, 1), 'duration_secs': round(duration_secs, 1), }) except (ValueError, ZeroDivisionError): pass elif key == 'progress' and value == 'end': push_event(job, { 'type': 'progress', 'index': idx, 'percent': 100.0, 'elapsed_secs': duration_secs, 'duration_secs': duration_secs, }) proc.wait() with job['lock']: cancelled = job['cancelled'] if cancelled: try: if os.path.exists(out_path): os.remove(out_path) except OSError: pass push_event(job, {'type': 'cancelled', 'message': 'Compression cancelled by user'}) with job['lock']: job['status'] = 'cancelled' return if proc.returncode != 0: try: tail = proc.stderr.read()[-500:] except Exception: tail = '' push_event(job, { 'type': 'file_error', 'index': idx, 'filename': p.name, 'message': f'ffmpeg exited with code {proc.returncode}', 'detail': tail, }) else: try: out_sz = os.path.getsize(out_path) out_gb = round(out_sz / (1024 ** 3), 3) orig_sz = file_info.get('size_bytes', 0) reduction = round((1 - out_sz / orig_sz) * 100, 1) if orig_sz else 0 except OSError: out_gb = 0 reduction = 0 push_event(job, { 'type': 'file_done', 'index': idx, 'filename': p.name, 'output': out_path, 'output_size_gb': out_gb, 'reduction_pct': reduction, 'message': f'Completed: {p.name} → saved {reduction}%', }) with job['lock']: job['current_index'] = idx + 1 except Exception as exc: push_event(job, { 'type': 'file_error', 'index': idx, 'filename': p.name, 'message': f'Exception: {exc}', }) push_event(job, {'type': 'done', 'message': f'All {total} file(s) processed.'}) with job['lock']: job['status'] = 'done' # --------------------------------------------------------------------------- # Dev-server entry point (production: gunicorn -c gunicorn.conf.py wsgi:app) # --------------------------------------------------------------------------- if __name__ == '__main__': import sys port = int(sys.argv[1]) if len(sys.argv) > 1 else 5000 print(f"\n{'='*60}") print(f" VideoPress — dev server http://localhost:{port}") print(f" MEDIA_ROOT : {MEDIA_ROOT}") print(f"{'='*60}\n") app.run(host='0.0.0.0', port=port, debug=False, threaded=True)