"""
FFmpeg-based video processing implementation.
This module provides video processing capabilities using FFmpeg as the backend.
"""
import subprocess
import tempfile
import numpy as np
import json
from pathlib import Path
from typing import List, Generator, Optional, Dict, Any
from PIL import Image
from ..exceptions import VideoProcessingError
from .codecs import get_codec_parameters, CodecParameters
from ..logging import get_logger
from ..types import VideoInfo
logger = get_logger("video")
[docs]
class FFmpegProcessor:
"""FFmpeg-based video processor for MemVid.
- Encodes and decodes video frames using FFmpeg.
- Used for efficient frame-level operations and video manipulation.
"""
[docs]
def __init__(
self,
fps: int,
resolution: tuple[int, int],
codec: str,
ffmpeg_options: Optional[Dict[str, Any]] = None,
):
"""Initialize FFmpeg processor.
Args:
fps: Frames per second
resolution: Video resolution (width, height)
codec: Video codec to use
ffmpeg_options: Additional FFmpeg options
"""
self.fps = fps
self.width, self.height = resolution
self.codec = codec.lower()
self.ffmpeg_options = ffmpeg_options or {}
# Get codec-specific parameters
self.codec_params, is_supported = get_codec_parameters(self.codec)
if not is_supported:
logger.warning(f"Codec {self.codec} is not supported, using default parameters")
# Override with provided options
if ffmpeg_options:
# Convert dict to CodecParameters for validation
override_params = CodecParameters(**ffmpeg_options)
# Update only the provided fields
for field in override_params.__class__.model_fields:
if field in ffmpeg_options:
setattr(self.codec_params, field, ffmpeg_options[field])
[docs]
def _get_ffmpeg_command(self, input_path: Optional[Path] = None) -> List[str]:
"""Get FFmpeg command with configured options.
Args:
input_path: Optional input file path
Returns:
List of command arguments
"""
cmd = ["ffmpeg", "-y"] # -y to overwrite output files
if input_path:
cmd.extend(["-i", str(input_path)])
return cmd
[docs]
def encode_video(
self,
frames: List[Image.Image],
output_path: Path,
) -> None:
"""Encode frames into a video file using FFmpeg.
Args:
frames: List of PIL Images to encode
output_path: Path to save the video file
Raises:
VideoProcessingError: If video encoding fails
"""
try:
if not frames:
raise VideoProcessingError("No frames to encode")
# Create temporary directory for frames
with tempfile.TemporaryDirectory() as temp_dir:
# Save frames as PNG files
frame_paths = []
for i, frame in enumerate(frames):
frame_path = Path(temp_dir) / f"frame_{i:04d}.png"
frame.save(frame_path, "PNG")
frame_paths.append(frame_path)
# Create FFmpeg command
cmd = self._get_ffmpeg_command()
cmd.extend([
"-framerate", str(self.fps),
"-i", str(Path(temp_dir) / "frame_%04d.png"),
"-c:v", self.codec,
"-pix_fmt", self.codec_params.pix_fmt,
"-vf", f"scale={self.width}:{self.height}"
])
# Add codec-specific options
if self.codec_params.video_crf is not None:
cmd.extend(["-crf", str(self.codec_params.video_crf)])
if self.codec_params.video_preset is not None:
cmd.extend(["-preset", self.codec_params.video_preset])
if self.codec_params.video_profile is not None:
cmd.extend(["-profile:v", self.codec_params.video_profile])
# Add extra FFmpeg arguments if specified
if self.codec_params.extra_ffmpeg_args:
cmd.extend(self.codec_params.extra_ffmpeg_args.split())
# Add output path
cmd.append(str(output_path))
# Run FFmpeg
subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
logger.info(f"Video encoded successfully to {output_path}")
except subprocess.CalledProcessError as e:
raise VideoProcessingError(
f"FFmpeg encoding failed: {e.stderr}"
) from e
except Exception as e:
raise VideoProcessingError(f"Failed to encode video: {str(e)}") from e
[docs]
def decode_video(
self,
video_path: Path,
) -> Generator[Image.Image, None, None]:
"""Decode frames from a video file using FFmpeg.
Args:
video_path: Path to the video file
Yields:
PIL Images from the video frames
Raises:
VideoProcessingError: If video decoding fails
"""
try:
if not video_path.exists():
raise VideoProcessingError(f"Video file not found: {video_path}")
# Create FFmpeg command for frame extraction
cmd = self._get_ffmpeg_command(video_path)
cmd.extend([
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-vf", f"scale={self.width}:{self.height}",
"-"
])
# Run FFmpeg and process output
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**8
)
frame_size = self.width * self.height * 3 # RGB24 format
while True:
# Read raw frame data
frame_data = process.stdout.read(frame_size)
if not frame_data or len(frame_data) != frame_size:
break
# Convert to numpy array and create PIL Image
frame_array = np.frombuffer(frame_data, dtype=np.uint8)
frame_array = frame_array.reshape((self.height, self.width, 3))
yield Image.fromarray(frame_array)
# Check for errors
process.wait()
if process.returncode != 0:
error = process.stderr.read().decode()
raise VideoProcessingError(f"FFmpeg decoding failed: {error}")
except Exception as e:
raise VideoProcessingError(f"Failed to decode video: {str(e)}") from e
[docs]
def remove_frames_from_video(
self,
video_path: Path,
frame_numbers: List[int],
output_path: Path,
) -> Path:
"""Remove specific frames from a video file using FFmpeg.
This method uses FFmpeg's select filter to efficiently remove frames
without re-encoding the entire video.
Args:
video_path: Path to the input video file
frame_numbers: List of frame numbers to remove (0-indexed)
output_path: Path to the output video file
Returns:
Path to the new video file without the specified frames
Raises:
VideoProcessingError: If frame removal fails
"""
try:
if not video_path.exists():
raise VideoProcessingError(f"Video file not found: {video_path}")
if not frame_numbers:
# No frames to remove, copy original file
import shutil
shutil.copy2(video_path, output_path)
return output_path
# Sort frame numbers in ascending order for FFmpeg filter
frame_numbers = sorted(frame_numbers)
# Create FFmpeg select filter expression
# The filter keeps frames where 'not(n)' is true for frames to remove
select_expr = "not(" + "+".join(f"eq(n,{frame})" for frame in frame_numbers) + ")"
# Build FFmpeg command
cmd = self._get_ffmpeg_command(video_path)
cmd.extend([
"-vf", f"select='{select_expr}'",
"-c:v", self.codec,
"-pix_fmt", self.codec_params.pix_fmt,
"-avoid_negative_ts", "make_zero",
"-fflags", "+genpts"
])
# Add codec-specific options
if self.codec_params.video_crf is not None:
cmd.extend(["-crf", str(self.codec_params.video_crf)])
if self.codec_params.video_preset is not None:
cmd.extend(["-preset", self.codec_params.video_preset])
if self.codec_params.video_profile is not None:
cmd.extend(["-profile:v", self.codec_params.video_profile])
# Add extra FFmpeg arguments if specified
if self.codec_params.extra_ffmpeg_args:
cmd.extend(self.codec_params.extra_ffmpeg_args.split())
# Add output path
cmd.append(str(output_path))
# Run FFmpeg
subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
logger.info(f"Removed {len(frame_numbers)} frames from video using FFmpeg")
return output_path
except subprocess.CalledProcessError as e:
raise VideoProcessingError(
f"FFmpeg frame removal failed: {e.stderr}"
) from e
except Exception as e:
raise VideoProcessingError(f"Failed to remove frames from video: {str(e)}") from e
[docs]
def get_video_info(self, video_path: Path) -> VideoInfo:
"""Get information about a video file using FFmpeg.
Args:
video_path: Path to the video file
Returns:
VideoInfo: Information about the video file
Raises:
VideoProcessingError: If getting video info fails
"""
try:
if not video_path.exists():
raise VideoProcessingError(f"Video file not found: {video_path}")
# Use FFmpeg to get video information
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
str(video_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
# Find video stream
video_stream = None
for stream in data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
break
if not video_stream:
raise VideoProcessingError("No video stream found")
# Extract video information
frame_count = int(video_stream.get("nb_frames", 0))
fps_str = video_stream.get("r_frame_rate", "0/1")
fps_parts = fps_str.split("/")
fps = float(fps_parts[0]) / float(fps_parts[1]) if len(fps_parts) == 2 else 0
width = int(video_stream.get("width", 0))
height = int(video_stream.get("height", 0))
duration_seconds = float(data.get("format", {}).get("duration", 0))
file_size_mb = video_path.stat().st_size / (1024 * 1024)
return VideoInfo(
frame_count=frame_count,
fps=fps,
width=width,
height=height,
duration_seconds=duration_seconds,
file_size_mb=file_size_mb
)
except subprocess.CalledProcessError as e:
raise VideoProcessingError(f"FFmpeg command failed: {e}")
except Exception as e:
raise VideoProcessingError(f"Failed to get video info: {str(e)}")