349 lines
11 KiB
Python
349 lines
11 KiB
Python
"""
|
|
app/jobs.py
|
|
===========
|
|
In-process job store and the ffmpeg compression worker thread.
|
|
|
|
Design note: job state is kept in a plain dict protected by a threading.Lock.
|
|
This is intentional — VideoPress uses a single Gunicorn worker process
|
|
(required for SSE streaming with gevent), so cross-process state sharing is
|
|
not needed. If you ever move to multiple workers, replace `active_jobs` with
|
|
a Redis-backed store and remove the threading.Lock.
|
|
|
|
Public API
|
|
----------
|
|
active_jobs : dict {job_id -> job_dict}
|
|
job_lock : Lock protects mutations to active_jobs
|
|
push_event() : append an SSE event to a job's event queue
|
|
run_compression_job(): worker — called in a daemon thread
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from .notify import send_completion_email
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Job store
|
|
# ---------------------------------------------------------------------------
|
|
|
|
active_jobs: dict = {}
|
|
job_lock = threading.Lock()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def push_event(job: dict, event: dict) -> None:
|
|
"""Append *event* to job['events'] under the job's own lock."""
|
|
with job['lock']:
|
|
job['events'].append(event)
|
|
|
|
|
|
def _choose_encoder(codec: str) -> tuple[str, bool]:
|
|
"""
|
|
Return (ffmpeg_encoder_name, is_hevc) for the given source codec string.
|
|
|
|
HEVC / H.265 sources are re-encoded with libx265 to preserve efficiency.
|
|
Everything else uses libx264 (universally supported, always available).
|
|
"""
|
|
normalised = codec.lower()
|
|
is_hevc = normalised in ('hevc', 'h265', 'x265')
|
|
encoder = 'libx265' if is_hevc else 'libx264'
|
|
return encoder, is_hevc
|
|
|
|
|
|
def _build_ffmpeg_cmd(
|
|
src: str,
|
|
out: str,
|
|
video_k: int,
|
|
is_hevc: bool,
|
|
encoder: str,
|
|
) -> list[str]:
|
|
"""
|
|
Build the ffmpeg command list for one file.
|
|
|
|
libx264 accepts -maxrate / -bufsize directly.
|
|
libx265 requires those same constraints via -x265-params because its
|
|
CLI option names differ from the generic ffmpeg flags.
|
|
Both use AAC audio at 128 kbps.
|
|
-movflags +faststart is only meaningful for MP4 containers but is
|
|
silently ignored for MKV / MOV / etc., so it is always included.
|
|
"""
|
|
if is_hevc:
|
|
vbv_maxrate = int(video_k * 1.5)
|
|
vbv_bufsize = video_k * 2
|
|
encoder_opts = [
|
|
'-c:v', encoder,
|
|
'-b:v', f'{video_k}k',
|
|
'-x265-params', f'vbv-maxrate={vbv_maxrate}:vbv-bufsize={vbv_bufsize}',
|
|
]
|
|
else:
|
|
encoder_opts = [
|
|
'-c:v', encoder,
|
|
'-b:v', f'{video_k}k',
|
|
'-maxrate', f'{int(video_k * 1.5)}k',
|
|
'-bufsize', f'{video_k * 2}k',
|
|
]
|
|
|
|
return [
|
|
'ffmpeg', '-y', '-i', src,
|
|
*encoder_opts,
|
|
'-c:a', 'aac', '-b:a', '128k',
|
|
'-movflags', '+faststart',
|
|
'-progress', 'pipe:1', '-nostats',
|
|
out,
|
|
]
|
|
|
|
|
|
def _get_duration(filepath: str) -> float:
|
|
"""Return the duration of *filepath* in seconds, or 0.0 on failure."""
|
|
try:
|
|
probe = subprocess.run(
|
|
['ffprobe', '-v', 'error',
|
|
'-show_entries', 'format=duration',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1',
|
|
filepath],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
return float(probe.stdout.strip()) if probe.stdout.strip() else 0.0
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def _send_notification(job: dict, email_results: list[dict], cancelled: bool) -> None:
|
|
"""Send email and push a 'notify' event regardless of outcome."""
|
|
notify_email = job.get('notify_email', '')
|
|
if not notify_email:
|
|
return
|
|
ok, err = send_completion_email(notify_email, email_results, cancelled)
|
|
push_event(job, {
|
|
'type': 'notify',
|
|
'success': ok,
|
|
'message': (f'Notification sent to {notify_email}.' if ok
|
|
else f'Could not send notification: {err}'),
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Compression worker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_compression_job(job_id: str) -> None:
|
|
"""
|
|
Worker function executed in a daemon thread for each compression job.
|
|
|
|
Iterates over the file list, runs ffmpeg for each file, streams progress
|
|
events, and sends an email notification when finished (if requested).
|
|
"""
|
|
with job_lock:
|
|
job = active_jobs.get(job_id)
|
|
if not job:
|
|
return
|
|
|
|
files = job['files']
|
|
suffix = job['suffix']
|
|
total = job['total']
|
|
|
|
push_event(job, {
|
|
'type': 'start',
|
|
'total': total,
|
|
'message': f'Starting compression of {total} file(s)',
|
|
})
|
|
|
|
for idx, file_info in enumerate(files):
|
|
|
|
# ── Cancellation check ────────────────────────────────────────────
|
|
with job['lock']:
|
|
cancelled = job['cancelled']
|
|
if cancelled:
|
|
_handle_cancel(job, idx)
|
|
return
|
|
|
|
# ── Per-file setup ────────────────────────────────────────────────
|
|
src_path = file_info['path']
|
|
target_bitrate = file_info.get('target_bit_rate_bps', 1_000_000)
|
|
src_codec = file_info.get('codec', 'unknown')
|
|
p = Path(src_path)
|
|
out_path = str(p.parent / (p.stem + suffix + p.suffix))
|
|
encoder, is_hevc = _choose_encoder(src_codec)
|
|
video_k = max(int(target_bitrate / 1000), 200)
|
|
|
|
push_event(job, {
|
|
'type': 'file_start',
|
|
'index': idx,
|
|
'total': total,
|
|
'filename': p.name,
|
|
'output': out_path,
|
|
'encoder': encoder,
|
|
'message': f'Compressing ({idx + 1}/{total}): {p.name} [{encoder}]',
|
|
})
|
|
|
|
duration_secs = _get_duration(src_path)
|
|
cmd = _build_ffmpeg_cmd(src_path, out_path, video_k, is_hevc, encoder)
|
|
|
|
# ── Run ffmpeg ────────────────────────────────────────────────────
|
|
try:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
with job['lock']:
|
|
job['process'] = proc
|
|
|
|
_stream_progress(job, proc, idx, duration_secs)
|
|
proc.wait()
|
|
|
|
with job['lock']:
|
|
cancelled = job['cancelled']
|
|
|
|
if cancelled:
|
|
_remove_partial(out_path)
|
|
_handle_cancel(job, idx)
|
|
return
|
|
|
|
if proc.returncode != 0:
|
|
_push_file_error(job, idx, p.name, proc)
|
|
else:
|
|
_push_file_done(job, idx, p.name, out_path, file_info)
|
|
|
|
with job['lock']:
|
|
job['current_index'] = idx + 1
|
|
|
|
except Exception as exc:
|
|
push_event(job, {
|
|
'type': 'file_error',
|
|
'index': idx,
|
|
'filename': p.name,
|
|
'message': f'Exception: {exc}',
|
|
})
|
|
|
|
# ── All files processed ───────────────────────────────────────────────
|
|
push_event(job, {
|
|
'type': 'done',
|
|
'message': f'All {total} file(s) processed.',
|
|
})
|
|
with job['lock']:
|
|
job['status'] = 'done'
|
|
all_events = list(job['events'])
|
|
|
|
completed = [{'status': 'done', **e} for e in all_events if e.get('type') == 'file_done']
|
|
errored = [{'status': 'error', **e} for e in all_events if e.get('type') == 'file_error']
|
|
_send_notification(job, completed + errored, cancelled=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Private sub-helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _stream_progress(
|
|
job: dict,
|
|
proc: subprocess.Popen,
|
|
idx: int,
|
|
duration_secs: float,
|
|
) -> None:
|
|
"""Read ffmpeg's -progress output and push progress events."""
|
|
for line in proc.stdout:
|
|
with job['lock']:
|
|
if job['cancelled']:
|
|
proc.terminate()
|
|
return
|
|
|
|
line = line.strip()
|
|
if '=' not in line:
|
|
continue
|
|
key, _, value = line.partition('=')
|
|
key, value = key.strip(), value.strip()
|
|
|
|
if key == 'out_time_ms' and duration_secs > 0:
|
|
try:
|
|
elapsed = int(value) / 1_000_000
|
|
pct = min(100.0, (elapsed / duration_secs) * 100)
|
|
push_event(job, {
|
|
'type': 'progress',
|
|
'index': idx,
|
|
'percent': round(pct, 1),
|
|
'elapsed_secs': round(elapsed, 1),
|
|
'duration_secs': round(duration_secs, 1),
|
|
})
|
|
except (ValueError, ZeroDivisionError):
|
|
pass
|
|
elif key == 'progress' and value == 'end':
|
|
push_event(job, {
|
|
'type': 'progress',
|
|
'index': idx,
|
|
'percent': 100.0,
|
|
'elapsed_secs': duration_secs,
|
|
'duration_secs': duration_secs,
|
|
})
|
|
|
|
|
|
def _remove_partial(path: str) -> None:
|
|
try:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _handle_cancel(job: dict, idx: int) -> None:
|
|
"""Push cancel event, set status, send notification for cancelled run."""
|
|
push_event(job, {'type': 'cancelled', 'message': 'Compression cancelled by user'})
|
|
with job['lock']:
|
|
job['status'] = 'cancelled'
|
|
all_events = list(job['events'])
|
|
|
|
completed = [{'status': 'done', **e} for e in all_events if e.get('type') == 'file_done']
|
|
errored = [{'status': 'error', **e} for e in all_events if e.get('type') == 'file_error']
|
|
_send_notification(job, completed + errored, cancelled=True)
|
|
|
|
|
|
def _push_file_error(
|
|
job: dict,
|
|
idx: int,
|
|
filename: str,
|
|
proc: subprocess.Popen,
|
|
) -> None:
|
|
try:
|
|
tail = proc.stderr.read()[-500:]
|
|
except Exception:
|
|
tail = ''
|
|
push_event(job, {
|
|
'type': 'file_error',
|
|
'index': idx,
|
|
'filename': filename,
|
|
'message': f'ffmpeg exited with code {proc.returncode}',
|
|
'detail': tail,
|
|
})
|
|
|
|
|
|
def _push_file_done(
|
|
job: dict,
|
|
idx: int,
|
|
filename: str,
|
|
out_path: str,
|
|
file_info: dict,
|
|
) -> None:
|
|
try:
|
|
out_sz = os.path.getsize(out_path)
|
|
out_gb = round(out_sz / (1024 ** 3), 3)
|
|
orig_sz = file_info.get('size_bytes', 0)
|
|
reduction = round((1 - out_sz / orig_sz) * 100, 1) if orig_sz else 0
|
|
except OSError:
|
|
out_gb = 0
|
|
reduction = 0
|
|
|
|
push_event(job, {
|
|
'type': 'file_done',
|
|
'index': idx,
|
|
'filename': filename,
|
|
'output': out_path,
|
|
'output_size_gb': out_gb,
|
|
'reduction_pct': reduction,
|
|
'message': f'Completed: {filename} → saved {reduction}%',
|
|
})
|