/** * stream.js * --------- * SSE progress stream management and reconnect / snapshot-restore logic. * * Exports * ------- * startProgressStream(jobId, files) — open (or re-open) the SSE connection * reconnectToJob(jobId) — fetch snapshot then re-open stream * applySnapshot(snap) — paint a server snapshot onto the UI * initStreamControls() — wire up Reconnect buttons; call once */ import { state, els, announce } from './state.js'; import { fmtTime } from './utils.js'; import { setupProgressSection, setOverallProgress, updateFileProgress, showStreamLost, hideStreamLost, showResults, } from './progress.js'; // ─── SSE stream ─────────────────────────────────────────────────────────────── /** * Open a Server-Sent Events connection for *jobId*. * * Resumes from state.seenEventCount so no events are replayed or skipped * after a reconnect. doneCount is seeded from already-known results so * the overall progress bar is correct on the first incoming event. * * @param {string} jobId * @param {Array} files — file objects (need .name for announcements) */ export function startProgressStream(jobId, files) { // Cancel any pending auto-reconnect timer if (state.reconnectTimer) { clearTimeout(state.reconnectTimer); state.reconnectTimer = null; } // Close any stale connection if (state.eventSource) { state.eventSource.close(); state.eventSource = null; } hideStreamLost(); state.eventSource = new EventSource( `/api/compress/progress/${jobId}?from=${state.seenEventCount}`, ); // Seed from results already recorded by applySnapshot (reconnect path) let doneCount = state.compressionResults.filter( r => r.status === 'done' || r.status === 'error', ).length; state.eventSource.onmessage = evt => { let data; try { data = JSON.parse(evt.data); } catch { return; } state.seenEventCount++; switch (data.type) { case 'start': els.progStatus.textContent = 'Running'; break; case 'file_start': updateFileProgress(data.index, 0, 'running', 'Compressing…', '', ''); document.getElementById(`fpi-${data.index}`) ?.scrollIntoView({ behavior: 'smooth', block: 'nearest' }); announce( `Compressing file ${data.index + 1} of ${data.total}: ` + `${files[data.index]?.name || ''}`, ); break; case 'progress': { const pct = data.percent || 0; const detail = (data.elapsed_secs > 0 && data.duration_secs > 0) ? `${fmtTime(data.elapsed_secs)} / ${fmtTime(data.duration_secs)}` : ''; updateFileProgress(data.index, pct, 'running', 'Compressing…', detail, ''); setOverallProgress(((doneCount + pct / 100) / files.length) * 100); break; } case 'file_done': { doneCount++; els.progDone.textContent = doneCount; const detail = data.reduction_pct ? `Saved ${data.reduction_pct}% → ${data.output_size_gb} GB` : 'Complete'; updateFileProgress(data.index, 100, 'done', '✓ Done', detail, 'success'); setOverallProgress((doneCount / files.length) * 100); // Guard against replay on reconnect if (!state.compressionResults.find( r => r.index === data.index && r.status === 'done', )) { state.compressionResults.push({ ...data, status: 'done' }); } announce( `File complete: ${files[data.index]?.name}. Saved ${data.reduction_pct}%.`, ); break; } case 'file_error': { doneCount++; els.progDone.textContent = doneCount; updateFileProgress(data.index, 0, 'error', '✗ Error', data.message, 'error'); if (!state.compressionResults.find( r => r.index === data.index && r.status === 'error', )) { state.compressionResults.push({ ...data, status: 'error' }); } announce(`Error: ${files[data.index]?.name}: ${data.message}`); break; } case 'notify': els.notifyStatus.hidden = false; els.notifyStatus.className = `notify-status ${data.success ? 'ok' : 'fail'}`; els.notifyStatus.textContent = `✉ ${data.message}`; announce(data.message); break; case 'done': state.eventSource.close(); sessionStorage.removeItem('vp-job-id'); els.progStatus.textContent = 'Complete'; setOverallProgress(100); els.cancelBtn.disabled = true; announce('All compression operations complete.'); showResults('done'); break; case 'cancelled': state.eventSource.close(); sessionStorage.removeItem('vp-job-id'); els.progStatus.textContent = 'Cancelled'; announce('Compression cancelled.'); showResults('cancelled'); break; case 'error': state.eventSource.close(); els.progStatus.textContent = 'Error'; announce(`Compression error: ${data.message}`); break; } }; state.eventSource.onerror = () => { // CLOSED means the stream ended cleanly (done/cancelled) — ignore. if (!state.eventSource || state.eventSource.readyState === EventSource.CLOSED) return; state.eventSource.close(); state.eventSource = null; showStreamLost(); // Auto-retry after 5 s state.reconnectTimer = setTimeout(() => { if (state.currentJobId) reconnectToJob(state.currentJobId); }, 5_000); }; } // ─── Reconnect ──────────────────────────────────────────────────────────────── /** * Fetch a fresh status snapshot from the server, rebuild the progress UI to * reflect everything that happened while disconnected, then re-open the SSE * stream starting from the last event already processed. * * @param {string} jobId */ export async function reconnectToJob(jobId) { if (state.reconnectTimer) { clearTimeout(state.reconnectTimer); state.reconnectTimer = null; } hideStreamLost(); els.progStatus.textContent = 'Reconnecting…'; announce('Reconnecting to compression job…'); try { const resp = await fetch(`/api/compress/status/${jobId}`); if (!resp.ok) throw new Error('Job no longer available on server.'); const snap = await resp.json(); applySnapshot(snap); if (snap.status === 'done' || snap.status === 'cancelled') { showResults(snap.status); sessionStorage.removeItem('vp-job-id'); } else { startProgressStream(jobId, snap.files); announce('Reconnected. Progress restored.'); } } catch (err) { els.progStatus.textContent = 'Reconnect failed'; showStreamLost(); els.streamLostBanner.querySelector('.banner-text').textContent = `Could not reconnect: ${err.message}`; announce(`Reconnect failed: ${err.message}`); } } // ─── Snapshot restore ──────────────────────────────────────────────────────── /** * Paint a server-supplied status snapshot onto the progress UI. * * Called by: * - reconnectToJob() after a mid-session SSE drop * - tryRestoreSession() on every page load to recover an active job * * @param {object} snap — response from GET /api/compress/status/ */ export function applySnapshot(snap) { // Rebuild the per-file DOM if the page was reloaded and lost it if (!document.getElementById('fpi-0')) { setupProgressSection(snap.files); } state.currentJobId = snap.job_id; state.seenEventCount = snap.event_count; sessionStorage.setItem('vp-job-id', snap.job_id); els.sectionProgress.hidden = false; els.progTotal.textContent = snap.total; els.progDone.textContent = snap.done_count; els.progStatus.textContent = snap.status === 'running' ? 'Running' : snap.status === 'done' ? 'Complete' : snap.status === 'cancelled' ? 'Cancelled' : snap.status; // Restore each file bar from the snapshot's computed file_states snap.file_states.forEach((fs, idx) => { const statusClass = { done: 'done', error: 'error', running: 'running' }[fs.status] || 'waiting'; const statusText = { done: '✓ Done', error: '✗ Error', running: 'Compressing…' }[fs.status] || 'Waiting'; const detailClass = { done: 'success', error: 'error' }[fs.status] || ''; updateFileProgress(idx, fs.percent || 0, statusClass, statusText, fs.detail || '', detailClass); }); // Restore overall bar const runningPct = snap.file_states.find(f => f.status === 'running')?.percent || 0; const overall = snap.total > 0 ? ((snap.done_count + runningPct / 100) / snap.total) * 100 : 0; setOverallProgress(Math.min(overall, 100)); // Seed compressionResults so showResults() has data if job is already done state.compressionResults = snap.file_states .filter(fs => fs.status === 'done' || fs.status === 'error') .map((fs, idx) => ({ ...fs, index: idx })); if (snap.status === 'done') { els.cancelBtn.disabled = true; setOverallProgress(100); } } // ─── Button wiring ──────────────────────────────────────────────────────────── /** * Attach click handlers to both Reconnect buttons (title-bar and banner). * Call once during app initialisation. */ export function initStreamControls() { els.reconnectBtn.addEventListener('click', () => { if (state.currentJobId) reconnectToJob(state.currentJobId); }); els.reconnectBtnBanner.addEventListener('click', () => { if (state.currentJobId) reconnectToJob(state.currentJobId); }); }