#!/usr/bin/env python3 """ Video Compressor Web Application — WSGI/Docker edition See wsgi.py for the Gunicorn entry point. """ import os import json import subprocess import threading import time from pathlib import Path from flask import Flask, request, jsonify, Response, render_template, stream_with_context # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- BASE_DIR = Path(__file__).resolve().parent MEDIA_ROOT = Path(os.environ.get('MEDIA_ROOT', '/media')).resolve() app = Flask( __name__, template_folder=str(BASE_DIR / 'templates'), static_folder=str(BASE_DIR / 'static'), ) active_jobs: dict = {} job_lock = threading.Lock() # --------------------------------------------------------------------------- # Path-jail helper — every client-supplied path passes through this # --------------------------------------------------------------------------- def safe_path(raw: str) -> Path: """ Resolve *raw* and assert it is inside MEDIA_ROOT. Raises PermissionError on any attempt to escape the root. """ try: resolved = Path(raw).resolve() except Exception: raise PermissionError(f"Invalid path: {raw!r}") root_str = str(MEDIA_ROOT) path_str = str(resolved) if path_str != root_str and not path_str.startswith(root_str + os.sep): raise PermissionError( f"Access denied: path is outside the allowed media root ({MEDIA_ROOT})." ) return resolved # --------------------------------------------------------------------------- # FFprobe / filesystem helpers # --------------------------------------------------------------------------- VIDEO_EXTENSIONS = { '.mp4', '.mkv', '.mov', '.avi', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg', '.ts', '.mts', '.m2ts', '.vob', '.ogv', '.3gp', '.3g2', } def get_video_info(filepath: str) -> dict | None: cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'format=duration,bit_rate,size:stream=codec_name,width,height', '-of', 'json', filepath, ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return None data = json.loads(result.stdout) fmt = data.get('format', {}) stream = (data.get('streams') or [{}])[0] duration = float(fmt.get('duration', 0)) bit_rate = int(fmt.get('bit_rate', 0)) size_bytes = int(fmt.get('size', 0)) codec = stream.get('codec_name', 'unknown') width = stream.get('width', 0) height = stream.get('height', 0) if bit_rate == 0 and duration > 0: bit_rate = int((size_bytes * 8) / duration) audio_bps = 128_000 video_bps = bit_rate - audio_bps if bit_rate > audio_bps else bit_rate target_video_bps = max(int(video_bps / 3), 200_000) return { 'duration': duration, 'bit_rate_bps': bit_rate, 'bit_rate_mbps': round(bit_rate / 1_000_000, 2), 'target_bit_rate_bps': target_video_bps, 'target_bit_rate_mbps': round(target_video_bps / 1_000_000, 2), 'size_bytes': size_bytes, 'size_gb': round(size_bytes / (1024 ** 3), 3), 'codec': codec, 'width': width, 'height': height, } except Exception: return None def list_video_files(directory: Path, min_size_gb: float) -> list: min_bytes = min_size_gb * (1024 ** 3) results = [] try: for root, dirs, files in os.walk(directory): dirs[:] = [d for d in dirs if not d.startswith('.')] for fname in files: if Path(fname).suffix.lower() in VIDEO_EXTENSIONS: fpath = os.path.join(root, fname) try: fsize = os.path.getsize(fpath) if fsize >= min_bytes: results.append({ 'path': fpath, 'name': fname, 'size_bytes': fsize, 'size_gb': round(fsize / (1024 ** 3), 3), }) except OSError: continue except PermissionError as exc: raise PermissionError(f"Cannot access directory: {exc}") from exc return results # --------------------------------------------------------------------------- # Routes — UI # --------------------------------------------------------------------------- @app.route('/') def index(): return render_template('index.html', media_root=str(MEDIA_ROOT)) # --------------------------------------------------------------------------- # Routes — API # --------------------------------------------------------------------------- @app.route('/api/config') def api_config(): """Expose server-side config the frontend needs (e.g. media root).""" return jsonify({'media_root': str(MEDIA_ROOT)}) @app.route('/api/browse') def browse_directory(): raw = request.args.get('path', str(MEDIA_ROOT)) try: path = safe_path(raw) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 if not path.exists(): return jsonify({'error': 'Path does not exist'}), 404 if not path.is_dir(): return jsonify({'error': 'Not a directory'}), 400 try: entries = [] for entry in sorted(path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): if entry.name.startswith('.'): continue entries.append({'name': entry.name, 'path': str(entry), 'is_dir': entry.is_dir()}) parent = str(path.parent) if path != MEDIA_ROOT else None return jsonify({ 'current': str(path), 'parent': parent, 'entries': entries, 'media_root': str(MEDIA_ROOT), }) except PermissionError: return jsonify({'error': 'Permission denied'}), 403 @app.route('/api/scan', methods=['POST']) def scan_directory(): data = request.get_json(silent=True) or {} raw_dir = data.get('directory', '') min_size_gb = float(data.get('min_size_gb', 1.0)) if not raw_dir: return jsonify({'error': 'No directory provided'}), 400 try: directory = safe_path(raw_dir) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 if not directory.is_dir(): return jsonify({'error': 'Invalid directory'}), 400 try: files = list_video_files(directory, min_size_gb) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 enriched = [] for f in files: info = get_video_info(f['path']) if info: f.update(info) else: f['bit_rate_mbps'] = round((f['size_bytes'] * 8) / (90 * 60 * 1_000_000), 2) f['target_bit_rate_mbps'] = round(f['bit_rate_mbps'] / 3, 2) f['bit_rate_bps'] = int(f['bit_rate_mbps'] * 1_000_000) f['target_bit_rate_bps'] = int(f['target_bit_rate_mbps'] * 1_000_000) f['duration'] = 0 f['codec'] = 'unknown' f['width'] = 0 f['height'] = 0 enriched.append(f) enriched.sort(key=lambda x: x['size_bytes'], reverse=True) return jsonify({'files': enriched, 'count': len(enriched)}) @app.route('/api/compress/start', methods=['POST']) def start_compression(): data = request.get_json(silent=True) or {} files = data.get('files', []) suffix = data.get('suffix', '_new') if not files: return jsonify({'error': 'No files provided'}), 400 for f in files: try: safe_path(f.get('path', '')) except PermissionError as exc: return jsonify({'error': str(exc)}), 403 job_id = f"job_{int(time.time() * 1000)}" job = { 'id': job_id, 'files': files, 'suffix': suffix, 'status': 'running', 'current_index': 0, 'total': len(files), 'events': [], 'process': None, 'cancelled': False, 'lock': threading.Lock(), } with job_lock: active_jobs[job_id] = job threading.Thread(target=run_compression_job, args=(job_id,), daemon=True).start() return jsonify({'job_id': job_id}) @app.route('/api/compress/progress/') def compression_progress(job_id): """ SSE stream. Works under Gunicorn+gevent: time.sleep() yields the greenlet instead of blocking a real OS thread. """ def event_stream(): last_idx = 0 while True: with job_lock: job = active_jobs.get(job_id) if not job: yield f"data: {json.dumps({'type': 'error', 'message': 'Job not found'})}\n\n" return with job['lock']: new_events = job['events'][last_idx:] last_idx += len(new_events) status = job['status'] for event in new_events: yield f"data: {json.dumps(event)}\n\n" if status in ('done', 'cancelled', 'error') and not new_events: break time.sleep(0.25) return Response( stream_with_context(event_stream()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, ) @app.route('/api/compress/cancel/', methods=['POST']) def cancel_compression(job_id): with job_lock: job = active_jobs.get(job_id) if not job: return jsonify({'error': 'Job not found'}), 404 with job['lock']: job['cancelled'] = True proc = job.get('process') if proc and proc.poll() is None: try: proc.terminate() time.sleep(1) if proc.poll() is None: proc.kill() except Exception: pass return jsonify({'status': 'cancellation requested'}) # --------------------------------------------------------------------------- # Compression worker # --------------------------------------------------------------------------- def push_event(job: dict, event: dict) -> None: with job['lock']: job['events'].append(event) def run_compression_job(job_id: str) -> None: with job_lock: job = active_jobs.get(job_id) if not job: return files = job['files'] suffix = job['suffix'] total = job['total'] push_event(job, {'type': 'start', 'total': total, 'message': f'Starting compression of {total} file(s)'}) for idx, file_info in enumerate(files): with job['lock']: cancelled = job['cancelled'] if cancelled: push_event(job, {'type': 'cancelled', 'message': 'Compression cancelled by user'}) with job['lock']: job['status'] = 'cancelled' return src_path = file_info['path'] target_bitrate = file_info.get('target_bit_rate_bps', 1_000_000) p = Path(src_path) out_path = str(p.parent / (p.stem + suffix + p.suffix)) push_event(job, { 'type': 'file_start', 'index': idx, 'total': total, 'filename': p.name, 'output': out_path, 'message': f'Compressing ({idx + 1}/{total}): {p.name}', }) try: probe = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', src_path], capture_output=True, text=True, timeout=30, ) duration_secs = float(probe.stdout.strip()) if probe.stdout.strip() else 0 except Exception: duration_secs = 0 video_k = max(int(target_bitrate / 1000), 200) cmd = [ 'ffmpeg', '-y', '-i', src_path, '-c:v', 'libx264', '-b:v', f'{video_k}k', '-maxrate', f'{int(video_k * 1.5)}k', '-bufsize', f'{video_k * 2}k', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', '-progress', 'pipe:1', '-nostats', out_path, ] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) with job['lock']: job['process'] = proc for line in proc.stdout: with job['lock']: cancelled = job['cancelled'] if cancelled: proc.terminate() break line = line.strip() if '=' not in line: continue key, _, value = line.partition('=') key, value = key.strip(), value.strip() if key == 'out_time_ms' and duration_secs > 0: try: elapsed = int(value) / 1_000_000 pct = min(100.0, (elapsed / duration_secs) * 100) push_event(job, { 'type': 'progress', 'index': idx, 'percent': round(pct, 1), 'elapsed_secs': round(elapsed, 1), 'duration_secs': round(duration_secs, 1), }) except (ValueError, ZeroDivisionError): pass elif key == 'progress' and value == 'end': push_event(job, { 'type': 'progress', 'index': idx, 'percent': 100.0, 'elapsed_secs': duration_secs, 'duration_secs': duration_secs, }) proc.wait() with job['lock']: cancelled = job['cancelled'] if cancelled: try: if os.path.exists(out_path): os.remove(out_path) except OSError: pass push_event(job, {'type': 'cancelled', 'message': 'Compression cancelled by user'}) with job['lock']: job['status'] = 'cancelled' return if proc.returncode != 0: try: tail = proc.stderr.read()[-500:] except Exception: tail = '' push_event(job, { 'type': 'file_error', 'index': idx, 'filename': p.name, 'message': f'ffmpeg exited with code {proc.returncode}', 'detail': tail, }) else: try: out_sz = os.path.getsize(out_path) out_gb = round(out_sz / (1024 ** 3), 3) orig_sz = file_info.get('size_bytes', 0) reduction = round((1 - out_sz / orig_sz) * 100, 1) if orig_sz else 0 except OSError: out_gb = 0 reduction = 0 push_event(job, { 'type': 'file_done', 'index': idx, 'filename': p.name, 'output': out_path, 'output_size_gb': out_gb, 'reduction_pct': reduction, 'message': f'Completed: {p.name} → saved {reduction}%', }) with job['lock']: job['current_index'] = idx + 1 except Exception as exc: push_event(job, { 'type': 'file_error', 'index': idx, 'filename': p.name, 'message': f'Exception: {exc}', }) push_event(job, {'type': 'done', 'message': f'All {total} file(s) processed.'}) with job['lock']: job['status'] = 'done' # --------------------------------------------------------------------------- # Dev-server entry point (production: gunicorn -c gunicorn.conf.py wsgi:app) # --------------------------------------------------------------------------- if __name__ == '__main__': import sys port = int(sys.argv[1]) if len(sys.argv) > 1 else 5000 print(f"\n{'='*60}") print(f" VideoPress — dev server http://localhost:{port}") print(f" MEDIA_ROOT : {MEDIA_ROOT}") print(f"{'='*60}\n") app.run(host='0.0.0.0', port=port, debug=False, threaded=True)