Этот коммит содержится в:
DimasfromLavoisier 2025-09-26 16:13:47 +02:00 коммит произвёл Dmitrii Kapitan
родитель a1b30f4331
Коммит 83cfff9dce
4 изменённых файлов: 258 добавлений и 125 удалений

Просмотреть файл

@ -3,7 +3,8 @@ from ammico.faces import EmotionDetector, ethical_disclosure
from ammico.model import MultimodalSummaryModel
from ammico.text import TextDetector, TextAnalyzer, privacy_disclosure
from ammico.image_summary import ImageSummaryDetector
from ammico.utils import find_files, get_dataframe
from ammico.utils import find_files, get_dataframe, AnalysisType, find_videos
from ammico.video_summary import VideoSummaryDetector
# Export the version defined in project metadata
try:
@ -14,13 +15,16 @@ except ImportError:
__version__ = "unknown"
__all__ = [
"AnalysisType",
"AnalysisExplorer",
"EmotionDetector",
"MultimodalSummaryModel",
"TextDetector",
"TextAnalyzer",
"ImageSummaryDetector",
"VideoSummaryDetector",
"find_files",
"find_videos",
"get_dataframe",
"ethical_disclosure",
"privacy_disclosure",

96
ammico/notebooks/DemoVideoSummaryVQA.ipynb Обычный файл
Просмотреть файл

@ -0,0 +1,96 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Video summary and visual question answering"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ammico"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Currently this module supports only video summarization, but it will be updated in the nearest future"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"video_dict = ammico.find_videos(\n",
" path=str(\"/insert/your/path/here/\"), # path to the folder with images\n",
" limit=-1, # -1 means no limit on the number of files, by default it is set to 20\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = ammico.MultimodalSummaryModel()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"vid_summary_model = ammico.VideoSummaryDetector(summary_model=model, subdict=video_dict)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"summary_dict = vid_summary_model.analyse_video()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"summary_dict[\"summary\"]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ammico-dev",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

Просмотреть файл

@ -5,8 +5,6 @@ import pooch
import importlib_resources
import collections
import random
from enum import Enum
@ -103,6 +101,30 @@ def _limit_results(results, limit):
return results
def find_videos(
path: str = None,
pattern=["mp4"], # TODO: test with more video formats
recursive: bool = True,
limit=5,
random_seed: int = None,
) -> dict:
"""Find video files on the file system."""
if path is None:
path = os.environ.get("AMMICO_DATA_HOME", ".")
if isinstance(pattern, str):
pattern = [pattern]
results = []
for p in pattern:
results.extend(_match_pattern(path, p, recursive=recursive))
if len(results) == 0:
raise FileNotFoundError(f"No files found in {path} with pattern '{pattern}'")
if random_seed is not None:
random.seed(random_seed)
random.shuffle(results)
videos = _limit_results(results, limit)
return initialize_dict(videos)
def find_files(
path: str = None,
pattern=["png", "jpg", "jpeg", "gif", "webp", "avif", "tiff"],

Просмотреть файл

@ -1,16 +1,14 @@
import decord
import os
import re
import math
import torch
import warnings
import numpy as np
from PIL import Image
from ammico.model import MultimodalSummaryModel
from ammico.utils import AnalysisMethod, AnalysisType
from ammico.utils import AnalysisMethod
from typing import List, Optional, Union, Dict, Any, Generator, Tuple
from typing import List, Optional, Dict, Any, Generator, Tuple
from transformers import GenerationConfig
@ -19,7 +17,6 @@ class VideoSummaryDetector(AnalysisMethod):
self,
summary_model: MultimodalSummaryModel,
subdict: dict = {},
gpu_id: int = 0,
) -> None:
"""
Class for analysing videos using QWEN-2.5-VL model.
@ -35,7 +32,89 @@ class VideoSummaryDetector(AnalysisMethod):
super().__init__(subdict)
self.summary_model = summary_model
self.gpu_id = gpu_id
def _frame_batch_generator(
self,
indices: torch.Tensor,
timestamps: torch.Tensor,
batch_size: int,
vr,
) -> Generator[Tuple[torch.Tensor, torch.Tensor], None, None]:
"""
Yield batches of (frames, timestamps) for given frame indices.
- frames are returned as a torch.Tensor with shape (B, C, H, W).
- timestamps is a 1D torch.Tensor with B elements.
"""
total = int(indices.numel())
device = torch.device("cpu")
for start in range(0, total, batch_size):
batch_idx_tensor = indices[start : start + batch_size]
# convert to python ints for decord API
batch_idx_list = [int(x.item()) for x in batch_idx_tensor]
# decord returns ndarray-like object; keep memory layout minimal and convert once
batch_frames_np = vr.get_batch(batch_idx_list).asnumpy()
# convert to CHW torch layout
batch_frames = (
torch.from_numpy(batch_frames_np).permute(0, 3, 1, 2).contiguous()
).to(device, non_blocking=True)
batch_times = timestamps[start : start + batch_size].to(
device, non_blocking=True
)
yield batch_frames, batch_times
def _extract_video_frames(
self,
entry: Optional[Dict[str, Any]],
frame_rate_per_second: float = 2,
batch_size: int = 32,
) -> Dict[str, Any]:
"""
Extract frames from a video at a specified frame rate and return them as a generator of batches.
Args:
filename (Union[str, os.PathLike]): Path to the video file.
frame_rate_per_second (float, optional): Frame extraction rate in frames per second. Default is 2.
batch_size (int, optional): Number of frames to include in each batch. Default is 32.
Returns:
Dict[str, Any]: A dictionary containing a generator that yields batches of frames and their timestamps
and the total number of extracted frames.
"""
filename = entry.get("filename")
if not filename:
raise ValueError("entry must contain key 'filename'")
# TODO: consider using torchcodec for video decoding, since decord is no longer actively maintained
vr = decord.VideoReader(filename)
nframes = len(vr)
video_fps = vr.get_avg_fps()
if video_fps is None or video_fps <= 0:
video_fps = 30.0
duration = nframes / float(video_fps)
if frame_rate_per_second <= 0:
raise ValueError("frame_rate_per_second must be > 0")
n_samples = max(1, int(math.floor(duration * frame_rate_per_second)))
sample_times = (
torch.linspace(0, duration, steps=n_samples)
if n_samples > 1
else torch.tensor([0.0])
)
indices = (sample_times * video_fps).round().long()
indices = torch.clamp(indices, 0, nframes - 1).unique(sorted=True)
timestamps = indices.to(torch.float32) / float(video_fps)
total_samples = int(indices.numel())
generator = self._frame_batch_generator(indices, timestamps, batch_size, vr)
return {"generator": generator, "n_frames": total_samples}
def _normalize_whitespace(self, s: str) -> str:
return re.sub(r"\s+", " ", s).strip()
@ -192,72 +271,6 @@ class VideoSummaryDetector(AnalysisMethod):
pil_list.append(Image.fromarray(arr))
return pil_list
def _extract_video_frames(
self,
video_path: Union[str, os.PathLike],
frame_rate_per_second: float = 2,
batch_size: int = 32,
) -> Dict[str, Any]:
"""
Extract frames from a video at a specified frame rate and return them as a generator of batches.
Args:
video_path (Union[str, os.PathLike]): Path to the video file.
frame_rate_per_second (float, optional): Frame extraction rate in frames per second. Default is 2.
batch_size (int, optional): Number of frames to include in each batch. Default is 32.
Returns:
Dict[str, Any]: A dictionary containing a generator that yields batches of frames and their timestamps
and the total number of extracted frames.
"""
device = (
torch.device("cuda") if (torch.cuda.is_available()) else torch.device("cpu")
)
if device == "cuda":
ctx = decord.gpu(self.gpu_id)
else:
ctx = decord.cpu()
# TODO: to support GPU version of decord: build from source to enable GPU acclerator
# https://github.com/dmlc/decord
vr = decord.VideoReader(video_path, ctx=ctx)
nframes = len(vr)
video_fps = vr.get_avg_fps()
if video_fps is None or video_fps <= 0:
video_fps = 30.0
duration = nframes / float(video_fps)
if frame_rate_per_second <= 0:
raise ValueError("frame_rate_per_second must be > 0")
n_samples = max(1, int(math.floor(duration * frame_rate_per_second)))
sample_times = (
torch.linspace(0, duration, steps=n_samples)
if n_samples > 1
else torch.tensor([0.0])
)
indices = (sample_times * video_fps).round().long()
indices = torch.clamp(indices, 0, nframes - 1).unique(sorted=True)
timestamps = indices.to(torch.float32) / float(video_fps)
total_samples = indices.numel()
def gen() -> Generator[Tuple[torch.Tensor, torch.Tensor], None, None]:
for batch_start in range(0, total_samples, batch_size):
batch_idx_tensor = indices[batch_start : batch_start + batch_size]
batch_idx_list = [int(x.item()) for x in batch_idx_tensor]
batch_frames_np = vr.get_batch(batch_idx_list).asnumpy()
batch_frames = (
torch.from_numpy(batch_frames_np).permute(0, 3, 1, 2).contiguous()
)
batch_times = timestamps[batch_start : batch_start + batch_size]
if device is not None:
batch_frames = batch_frames.to(device, non_blocking=True)
batch_times = batch_times.to(device, non_blocking=True)
yield batch_frames, batch_times
return {"generator": gen(), "n_frames": total_samples}
def brute_force_summary(
self,
extracted_video_dict: Dict[str, Any],
@ -275,10 +288,10 @@ class VideoSummaryDetector(AnalysisMethod):
gen = extracted_video_dict["generator"]
caption_instruction = "Describe this image in one concise caption."
collected: List[Tuple[float, str]] = []
proc = self.summary_model.processor
for batch_frames, batch_times in gen:
pil_list = self._tensor_batch_to_pil_list(batch_frames.cpu())
proc = self.summary_model.processor
prompt_texts = []
for p in pil_list:
@ -291,12 +304,10 @@ class VideoSummaryDetector(AnalysisMethod):
],
}
]
try:
prompt_text = proc.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
except TypeError:
prompt_text = proc.apply_chat_template(messages)
prompt_text = proc.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
prompt_texts.append(prompt_text)
processor_inputs = proc(
@ -309,15 +320,16 @@ class VideoSummaryDetector(AnalysisMethod):
self.summary_model.tokenizer,
)
batch_times_cpu = (
batch_times.cpu().tolist()
if isinstance(batch_times, torch.Tensor)
else list(batch_times)
)
for t, c in zip(batch_times_cpu, captions):
# normalize batch_times to Python floats
if isinstance(batch_times, torch.Tensor):
batch_times_list = batch_times.cpu().tolist()
else:
batch_times_list = list(batch_times)
for t, c in zip(batch_times_list, captions):
collected.append((float(t), c))
collected.sort(key=lambda x: x[0])
gen.close()
MAX_CAPTIONS_FOR_SUMMARY = 200
caps_for_summary = (
@ -339,28 +351,20 @@ class VideoSummaryDetector(AnalysisMethod):
+ "\n\nPlease produce a single concise paragraph."
)
proc = self.summary_model.processor
if hasattr(proc, "apply_chat_template"):
messages = [
{
"role": "user",
"content": [{"type": "text", "text": summary_user_text}],
}
]
try:
summary_prompt_text = proc.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
except TypeError:
summary_prompt_text = proc.apply_chat_template(messages)
summary_inputs = proc(
text=[summary_prompt_text], return_tensors="pt", padding=True
)
else:
summary_prompt_text = summary_user_text
summary_inputs = self.summary_model.tokenizer(
summary_prompt_text, return_tensors="pt"
)
messages = [
{
"role": "user",
"content": [{"type": "text", "text": summary_user_text}],
}
]
summary_prompt_text = proc.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
summary_inputs = proc(
text=[summary_prompt_text], return_tensors="pt", padding=True
)
summary_inputs = {
k: v.to(self.summary_model.device) if isinstance(v, torch.Tensor) else v
@ -379,11 +383,11 @@ class VideoSummaryDetector(AnalysisMethod):
"summary": final_summary,
}
def analyse_videos(self, frame_rate_per_second: float = 2.0) -> Dict[str, Any]:
def analyse_video(self, frame_rate_per_second: float = 2.0) -> Dict[str, Any]:
"""
Analyse the video specified in self.subdict using frame extraction and captioning.
For short videos (<=50 frames at the specified frame rate), it uses brute-force captioning.
For longer videos, it currently defaults to brute-force captioning but can be extended for more complex methods.
For short videos (<=100 frames at the specified frame rate), it uses brute-force captioning.
For longer videos, it currently defaults to brute-force captioning, but can be extended for more complex methods.
Args:
frame_rate_per_second (float): Frame extraction rate in frames per second. Default is 2.0.
@ -391,18 +395,25 @@ class VideoSummaryDetector(AnalysisMethod):
Dict[str, Any]: A dictionary containing the analysis results, including captions and summary.
"""
minimal_edge_of_frames = 50
extracted_video_dict = self._extract_video_frames(
self.subdict["video_path"], frame_rate_per_second=frame_rate_per_second
)
if extracted_video_dict["n_frames"] <= minimal_edge_of_frames:
answer = self.brute_force_summary(extracted_video_dict)
else:
# TODO: implement processing for long videos
summary_instruction = "Describe this image in a single caption, including all important details."
answer = self.brute_force_summary(
extracted_video_dict, summary_instruction=summary_instruction
minimal_edge_of_frames = 100
all_answers = {}
# TODO: add support for answering questions about videos
for video_key in list(self.subdict.keys()):
entry = self.subdict[video_key]
extracted_video_dict = self._extract_video_frames(
entry, frame_rate_per_second=frame_rate_per_second
)
if extracted_video_dict["n_frames"] <= minimal_edge_of_frames:
answer = self.brute_force_summary(extracted_video_dict)
else:
# TODO: implement processing for long videos
summary_instruction = "Describe this image in a single caption, including all important details."
answer = self.brute_force_summary(
extracted_video_dict, summary_instruction=summary_instruction
)
all_answers[video_key] = {"summary": answer["summary"]}
# TODO: captions has to be post-processed with foreseeing audio analysis
return answer