Python视频处理案例实操?

wen python案例 2

本文目录导读:

  1. 视频截取与裁剪
  2. 视频缩放与旋转
  3. 视频添加水印和文字
  4. 视频滤镜效果
  5. 视频拼接与合成
  6. 视频帧处理与提取
  7. 视频稳定化
  8. 实时视频处理
  9. 实用工具函数
  10. 安装依赖

我来分享几个实用的Python视频处理案例,涵盖常见的视频编辑任务。

视频截取与裁剪

import cv2
from moviepy.editor import VideoFileClip
# 方法1:使用OpenCV截取视频片段
def cut_video_opencv(input_path, output_path, start_time, end_time):
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    start_frame = int(start_time * fps)
    end_frame = int(end_time * fps)
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1
        if start_frame <= frame_count <= end_frame:
            out.write(frame)
    cap.release()
    out.release()
    cv2.destroyAllWindows()
# 使用moviepy(更简洁)
def cut_video_moviepy(input_path, output_path, start_time, end_time):
    clip = VideoFileClip(input_path)
    subclip = clip.subclip(start_time, end_time)
    subclip.write_videofile(output_path)
# 使用示例
cut_video_moviepy("input.mp4", "output.mp4", 10, 30)  # 截取10-30秒

视频缩放与旋转

import cv2
def resize_video(input_path, output_path, scale_percent):
    """缩放视频"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    # 获取原始尺寸
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # 计算缩放后的尺寸
    new_width = int(width * scale_percent / 100)
    new_height = int(height * scale_percent / 100)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (new_width, new_height))
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        resized = cv2.resize(frame, (new_width, new_height))
        out.write(resized)
    cap.release()
    out.release()
def rotate_video(input_path, output_path, angle):
    """旋转视频"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    # 旋转矩阵
    center = (width // 2, height // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        rotated = cv2.warpAffine(frame, rotation_matrix, (width, height))
        out.write(rotated)
    cap.release()
    out.release()
# 使用示例
resize_video("input.mp4", "resized.mp4", 50)  # 缩小50%
rotate_video("input.mp4", "rotated.mp4", 90)  # 旋转90度

视频添加水印和文字

import cv2
from PIL import Image, ImageDraw, ImageFont
import numpy as np
def add_text_to_video(input_path, output_path, text, position=(50, 50)):
    """添加文字水印"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    # 加载中文字体
    fontpath = "path/to/chinese_font.ttf"  # 替换为你的字体路径
    font = ImageFont.truetype(fontpath, 36)
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # 转换为PIL图像
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(frame_rgb)
        # 添加文字
        draw = ImageDraw.Draw(pil_image)
        draw.text(position, text, font=font, fill=(255, 255, 255))
        # 转换回OpenCV格式
        frame_with_text = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
        out.write(frame_with_text)
    cap.release()
    out.release()
def add_image_watermark(input_path, output_path, watermark_path, position=(10, 10)):
    """添加图片水印"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    watermark = cv2.imread(watermark_path, cv2.IMREAD_UNCHANGED)
    watermark = cv2.resize(watermark, (100, 100))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # 添加水印
        x, y = position
        frame[y:y+100, x:x+100] = cv2.addWeighted(
            frame[y:y+100, x:x+100], 0.7,
            watermark[:, :, :3], 0.3, 0
        )
        out.write(frame)
    cap.release()
    out.release()

视频滤镜效果

import cv2
import numpy as np
def apply_filters(input_path, output_path):
    """应用多种滤镜效果"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # 灰度滤镜
        # gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # frame = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
        # 复古滤镜(暖色调)
        frame = cv2.applyColorMap(frame, cv2.COLORMAP_OCEAN)
        # 高斯模糊
        # frame = cv2.GaussianBlur(frame, (15, 15), 0)
        # 边缘检测
        # edges = cv2.Canny(frame, 100, 200)
        # frame = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        out.write(frame)
    cap.release()
    out.release()
# 使用示例
apply_filters("input.mp4", "filtered.mp4")

视频拼接与合成

from moviepy.editor import VideoFileClip, concatenate_videoclips, CompositeVideoClip
import cv2
import numpy as np
def concatenate_videos(video_list, output_path):
    """拼接多个视频"""
    clips = [VideoFileClip(video) for video in video_list]
    final_clip = concatenate_videoclips(clips)
    final_clip.write_videofile(output_path)
def create_picture_in_picture(input_path, pip_path, output_path):
    """画中画效果"""
    main_clip = VideoFileClip(input_path)
    pip_clip = VideoFileClip(pip_path).resize(0.3).set_position(("right", "bottom"))
    # 确保画中画视频时长不超过主视频
    pip_clip = pip_clip.set_duration(main_clip.duration)
    final_clip = CompositeVideoClip([main_clip, pip_clip])
    final_clip.write_videofile(output_path)
def side_by_side(input_path1, input_path2, output_path):
    """并排显示两个视频"""
    clip1 = VideoFileClip(input_path1).resize(height=720)
    clip2 = VideoFileClip(input_path2).resize(height=720)
    # 调整宽度相同
    min_width = min(clip1.w, clip2.w)
    clip1 = clip1.resize(width=min_width)
    clip2 = clip2.resize(width=min_width)
    # 并排合成
    final = clips_array([[clip1, clip2]])
    final.write_videofile(output_path)
# 使用示例
concatenate_videos(["part1.mp4", "part2.mp4"], "merged.mp4")
create_picture_in_picture("main.mp4", "pip.mp4", "pip_result.mp4")

视频帧处理与提取

import cv2
import os
def extract_frames(input_path, output_dir, frame_interval=30):
    """提取视频帧"""
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(input_path)
    frame_count = 0
    extracted_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % frame_interval == 0:
            output_path = os.path.join(output_dir, f"frame_{extracted_count:04d}.jpg")
            cv2.imwrite(output_path, frame)
            extracted_count += 1
        frame_count += 1
    cap.release()
    print(f"提取了 {extracted_count} 帧")
def extract_audio(input_path, output_path):
    """提取音频"""
    import subprocess
    command = [
        'ffmpeg',
        '-i', input_path,
        '-vn',  # 不处理视频
        '-acodec', 'libmp3lame',
        output_path
    ]
    subprocess.run(command)
def create_video_from_frames(frame_dir, output_path, fps=24):
    """从图片序列创建视频"""
    frames = sorted([f for f in os.listdir(frame_dir) if f.endswith('.jpg')])
    if not frames:
        print("没有找到图片")
        return
    # 读取第一帧获取尺寸
    first_frame = cv2.imread(os.path.join(frame_dir, frames[0]))
    height, width = first_frame.shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    for frame_name in frames:
        frame = cv2.imread(os.path.join(frame_dir, frame_name))
        out.write(frame)
    out.release()
# 使用示例
extract_frames("video.mp4", "frames/")
extract_audio("video.mp4", "audio.mp3")
create_video_from_frames("frames/", "reconstructed.mp4")

视频稳定化

import cv2
import numpy as np
def stabilize_video(input_path, output_path):
    """视频稳定化"""
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    _, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    transforms = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # 计算光流
        prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=200, 
                                          qualityLevel=0.01, minDistance=30)
        curr_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, 
                                                       prev_pts, None)
        idx = np.where(status == 1)[0]
        prev_pts = prev_pts[idx]
        curr_pts = curr_pts[idx]
        # 计算仿射变换
        transform, _ = cv2.estimateAffinePartial2D(prev_pts, curr_pts)
        if transform is not None:
            transforms.append(transform)
        prev_gray = curr_gray
        # 应用累积变换
        if len(transforms) > 1:
            cum_transform = np.eye(2, 3)
            for t in transforms[-10:]:  # 平滑处理
                cum_transform[:2, :2] += t[:2, :2]
                cum_transform[:2, 2] += t[:2, 2]
            cum_transform[:2, :2] /= min(len(transforms), 10)
            cum_transform[:2, 2] /= min(len(transforms), 10)
            stabilized = cv2.warpAffine(frame, cum_transform, (width, height))
            out.write(stabilized)
        else:
            out.write(frame)
    cap.release()
    out.release()

实时视频处理

import cv2
from threading import Thread
class VideoProcessor:
    """实时视频处理类"""
    def __init__(self, src=0):
        self.cap = cv2.VideoCapture(src)
        self.running = False
        self.frame = None
    def start(self):
        """启动视频处理"""
        self.running = True
        Thread(target=self._process, daemon=True).start()
    def stop(self):
        """停止处理"""
        self.running = False
        self.cap.release()
        cv2.destroyAllWindows()
    def _process(self):
        """处理线程"""
        while self.running:
            ret, self.frame = self.cap.read()
            if not ret:
                break
            # 应用处理效果
            processed = self._apply_effects(self.frame)
            # 显示结果
            cv2.imshow('Video Processing', processed)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    def _apply_effects(self, frame):
        """应用效果(可自定义)"""
        # 示例:转换为灰度并添加边缘检测
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
# 使用示例
processor = VideoProcessor(0)  # 使用摄像头
processor.start()

实用工具函数

import os
from pathlib import Path
def get_video_info(video_path):
    """获取视频信息"""
    cap = cv2.VideoCapture(video_path)
    info = {
        'fps': cap.get(cv2.CAP_PROP_FPS),
        'width': int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
        'height': int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
        'frame_count': int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
        'duration': cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)
    }
    cap.release()
    return info
def batch_process_videos(input_dir, output_dir, process_func, **kwargs):
    """批量处理视频"""
    os.makedirs(output_dir, exist_ok=True)
    for video_file in Path(input_dir).glob('*.mp4'):
        input_path = str(video_file)
        output_path = os.path.join(output_dir, f"processed_{video_file.name}")
        print(f"处理: {video_file.name}")
        process_func(input_path, output_path, **kwargs)
    print("批量处理完成!")
# 使用示例
info = get_video_info("video.mp4")
print(f"视频信息: {info}")

安装依赖

pip install opencv-python moviepy pillow numpy

这些案例涵盖了视频处理的主要场景,根据你的具体需求,可以组合使用这些功能,如果你需要特定的视频处理功能,我可以提供更详细的实现。

标签: 案例实操

抱歉,评论功能暂时关闭!