Source code for stable_datasets.video.something_something_v2

import csv
import json
import os
import shutil
import stat
import tarfile
import tempfile
import zipfile
from collections.abc import Iterator
from pathlib import Path

from loguru import logger as logging

from stable_datasets.schema import DatasetInfo, DatasetSource, DownloadInfo, Features, Value, Version, Video
from stable_datasets.splits import Split, SplitGenerator
from stable_datasets.utils import BaseDatasetBuilder, _default_dest_folder, bulk_download


_VIDEO_EXTENSIONS = (".webm", ".mp4", ".mkv", ".avi", ".mov")
_EXTRACTION_SENTINEL = ".extraction_complete"


def _normalize_template(value: str | None) -> str:
    # SSv2 class templates mark placeholder objects as "[something]"; labels
    # files omit the brackets, so normalize both forms to the same key.
    if value is None:
        return ""
    return str(value).replace("[", "").replace("]", "").strip()


[docs] class SomethingSomethingV2(BaseDatasetBuilder): """Something-Something V2 action-recognition videos. The Qualcomm package is large. Set ``STABLE_DATASETS_CACHE_DIR`` or pass ``download_dir=`` / ``processed_cache_dir=`` before loading it on machines with small home quotas. """ VERSION = Version("2.0.0") SOURCE = DatasetSource( homepage="https://developer.qualcomm.com/software/ai-datasets/something-something", assets={ "video_part_00": DownloadInfo( url="https://apigwx-aws.qualcomm.com/qsc/public/v1/api/download/software/dataset/AIDataset/Something-Something-V2/20bn-something-something-v2-00", filename="20bn-something-something-v2-00", ), "video_part_01": DownloadInfo( url="https://apigwx-aws.qualcomm.com/qsc/public/v1/api/download/software/dataset/AIDataset/Something-Something-V2/20bn-something-something-v2-01", filename="20bn-something-something-v2-01", ), "labels": DownloadInfo( url="https://softwarecenter.qualcomm.com/api/download/software/dataset/AIDataset/Something-Something-V2/20bn-something-something-download-package-labels.zip", filename="20bn-something-something-download-package-labels.zip", ), }, license="Qualcomm data license agreement for research use. Users must obtain and use the dataset under Qualcomm's terms.", citation="""@inproceedings{goyal2017something, title={The "Something Something" Video Database for Learning and Evaluating Visual Common Sense}, author={Goyal, Raghav and Ebrahimi Kahou, Samira and Michalski, Vincent and Materzynska, Joanna and Westphal, Susanne and Kim, Heuna and Haenel, Valentin and Fruend, Ingo and Yianilos, Peter and Mueller-Freitag, Moritz and others}, booktitle={Proceedings of the IEEE International Conference on Computer Vision}, pages={5842--5850}, year={2017} }""", ) def __init__(self, config_name: str | None = None, data_dir: str | Path | None = None, **kwargs): self.data_dir = Path(data_dir).expanduser() if data_dir is not None else None self._video_index_cache: dict[str, dict[str, Path]] = {} super().__init__(config_name=config_name, **kwargs) def _info(self) -> DatasetInfo: return DatasetInfo( description=( "Something-Something V2 contains short crowd-sourced videos of " "humans performing fine-grained actions with everyday objects." ), features=Features( { "video": Video(storage="path", allowed_extensions=_VIDEO_EXTENSIONS), "video_id": Value("string"), "video_filename": Value("string"), "label": Value("int32"), "text": Value("string"), "template": Value("string"), "placeholders_json": Value("string"), "split": Value("string"), } ), supervised_keys=("video", "label"), homepage=self.SOURCE["homepage"], license=self.SOURCE["license"], citation=self.SOURCE["citation"], ) def _candidate_splits(self) -> list: return [Split.TRAIN, Split.VALIDATION, Split.TEST] def _split_generators(self) -> list[SplitGenerator]: work_dir = Path(getattr(self, "_raw_download_dir", _default_dest_folder())) work_dir.mkdir(parents=True, exist_ok=True) if self.data_dir is None: source = self._source() assets = source["assets"] downloaded = bulk_download( [assets["video_part_00"], assets["video_part_01"], assets["labels"]], dest_folder=work_dir, ) video_parts = downloaded[:2] labels_source = downloaded[2] else: video_parts, labels_source = self._resolve_local_inputs(self.data_dir) labels_dir = self._ensure_labels_dir(labels_source, work_dir) videos_dir = self._ensure_videos_dir(video_parts, work_dir) label_map = self._load_label_map(labels_dir) return [ SplitGenerator( name=Split.TRAIN, gen_kwargs={ "annotation_file": self._find_split_json(labels_dir, "train"), "videos_dir": videos_dir, "split": Split.TRAIN, "label_map": label_map, }, ), SplitGenerator( name=Split.VALIDATION, gen_kwargs={ "annotation_file": self._find_split_json(labels_dir, "validation"), "videos_dir": videos_dir, "split": Split.VALIDATION, "label_map": label_map, }, ), SplitGenerator( name=Split.TEST, gen_kwargs={ "annotation_file": self._find_split_json(labels_dir, "test"), "videos_dir": videos_dir, "split": Split.TEST, "label_map": label_map, "test_answers_file": self._find_optional_file( labels_dir, ("test-answers.csv", "test_answers.csv"), ), }, ), ] def _generate_examples( self, annotation_file: Path, videos_dir: Path, split: str, label_map: dict[str, int], test_answers_file: Path | None = None, ) -> Iterator[tuple[str, dict]]: annotations = self._load_annotations(annotation_file) test_templates = self._load_test_answers(test_answers_file) video_index = self._video_index(videos_dir) missing = 0 missing_examples = [] for annotation in annotations: video_id = str(annotation["id"]) video_path = video_index.get(video_id) if video_path is None: missing += 1 if len(missing_examples) < 5: missing_examples.append(video_id) continue template = _normalize_template(annotation.get("template")) if not template and video_id in test_templates: template = _normalize_template(test_templates[video_id]) label = int(label_map.get(template, -1)) if template else -1 text = str(annotation.get("label") or template) placeholders = annotation.get("placeholders") or [] yield ( video_id, { "video": str(video_path), "video_id": video_id, "video_filename": video_path.name, "label": label, "text": text, "template": template, "placeholders_json": json.dumps(placeholders), "split": split, }, ) if missing: logging.warning( f"SSv2 {split}: skipped {missing}/{len(annotations)} annotations " f"because matching video files were missing. First missing ids: {missing_examples}" ) def _resolve_local_inputs(self, data_dir: Path) -> tuple[list[Path], Path]: if not data_dir.exists(): raise FileNotFoundError(f"SSv2 data_dir does not exist: {data_dir}") labels_dir = data_dir / "labels" if labels_dir.exists(): labels_source = labels_dir else: label_zips = sorted(data_dir.glob("*labels*.zip")) if not label_zips: raise FileNotFoundError(f"Could not find labels/ or a *labels*.zip file under {data_dir}") labels_source = label_zips[0] videos_dir = data_dir / "videos" if videos_dir.exists(): return [videos_dir], labels_source local_videos = [p for p in data_dir.rglob("*") if p.is_file() and p.suffix.lower() in _VIDEO_EXTENSIONS] if local_videos: return [data_dir], labels_source archive_candidates = sorted( p for p in data_dir.iterdir() if p.is_file() and p.name.startswith("20bn-something-something-v2") ) if not archive_candidates: raise FileNotFoundError( f"Could not find videos/, video files, or 20bn-something-something-v2* archives under {data_dir}" ) return archive_candidates, labels_source def _ensure_labels_dir(self, labels_source: Path, work_dir: Path) -> Path: labels_source = Path(labels_source) if labels_source.is_dir(): return labels_source labels_dir = work_dir / "ssv2_labels" if self._is_extraction_complete(labels_dir): return labels_dir if not zipfile.is_zipfile(labels_source): raise ValueError(f"SSv2 labels package is not a zip file: {labels_source}") self._extract_archives_to_complete_dir([labels_source], labels_dir) return labels_dir def _ensure_videos_dir(self, video_sources: list[Path], work_dir: Path) -> Path: video_sources = [Path(path) for path in video_sources] if len(video_sources) == 1 and video_sources[0].is_dir(): return video_sources[0] videos_dir = work_dir / "ssv2_videos" if self._is_extraction_complete(videos_dir): return videos_dir if all(zipfile.is_zipfile(path) for path in video_sources): self._extract_archives_to_complete_dir(video_sources, videos_dir) return videos_dir if len(video_sources) == 1: self._extract_archives_to_complete_dir(video_sources, videos_dir) return videos_dir concatenated = self._concatenate_parts(video_sources, work_dir) self._extract_archives_to_complete_dir([concatenated], videos_dir) return videos_dir def _extract_archive(self, archive: Path, dest: Path) -> None: if tarfile.is_tarfile(archive): self._extract_tar(archive, dest) return if zipfile.is_zipfile(archive): self._safe_zip_extract(archive, dest) return raise ValueError(f"Unsupported SSv2 video archive format: {archive}") def _extract_tar(self, archive: Path, dest: Path) -> None: with tarfile.open(archive, "r:*") as tf: try: tf.extractall(dest, filter="data") except TypeError: self._safe_tar_extract_legacy(tf, dest) def _safe_tar_extract_legacy(self, tf: tarfile.TarFile, dest: Path) -> None: dest = Path(dest).resolve() for member in tf.getmembers(): target = self._safe_archive_target(dest, member.name) if not (member.isfile() or member.isdir()): raise ValueError(f"Unsafe member in tar archive: {member.name}") if member.isdir(): target.mkdir(parents=True, exist_ok=True) continue src = tf.extractfile(member) if src is None: raise ValueError(f"Could not read tar member: {member.name}") target.parent.mkdir(parents=True, exist_ok=True) with src, open(target, "wb") as out: shutil.copyfileobj(src, out) def _safe_zip_extract(self, archive: Path, dest: Path) -> None: dest = Path(dest).resolve() dest.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(archive) as zf: for info in zf.infolist(): target = self._safe_archive_target(dest, info.filename) mode = info.external_attr >> 16 file_type = stat.S_IFMT(mode) if file_type and not (stat.S_ISREG(mode) or stat.S_ISDIR(mode)): raise ValueError(f"Unsafe member in zip archive: {info.filename}") if info.is_dir(): target.mkdir(parents=True, exist_ok=True) continue target.parent.mkdir(parents=True, exist_ok=True) with zf.open(info) as src, open(target, "wb") as out: shutil.copyfileobj(src, out) def _safe_archive_target(self, dest: Path, name: str) -> Path: target = (dest / name).resolve() if os.path.commonpath([str(dest), str(target)]) != str(dest): raise ValueError(f"Unsafe path in archive: {name}") return target def _extract_archives_to_complete_dir(self, archives: list[Path], dest: Path) -> None: dest = Path(dest) if self._is_extraction_complete(dest): return dest.parent.mkdir(parents=True, exist_ok=True) tmp_dir = Path(tempfile.mkdtemp(dir=dest.parent, prefix=f".{dest.name}_tmp_")) try: for archive in archives: self._extract_archive(Path(archive), tmp_dir) (tmp_dir / _EXTRACTION_SENTINEL).write_text("ok\n") if dest.exists(): shutil.rmtree(dest) os.rename(tmp_dir, dest) except BaseException: shutil.rmtree(tmp_dir, ignore_errors=True) raise def _is_extraction_complete(self, dest: Path) -> bool: return (Path(dest) / _EXTRACTION_SENTINEL).is_file() def _concatenate_parts(self, video_sources: list[Path], work_dir: Path) -> Path: parts = sorted((Path(path) for path in video_sources), key=lambda p: p.name) concatenated = Path(work_dir) / "20bn-something-something-v2.concatenated" manifest_path = concatenated.with_suffix(concatenated.suffix + ".json") manifest = { "parts": [ { "name": part.name, "size": part.stat().st_size, "mtime_ns": part.stat().st_mtime_ns, } for part in parts ] } if concatenated.exists() and manifest_path.exists(): try: existing = json.loads(manifest_path.read_text()) except json.JSONDecodeError: existing = None if existing == manifest: return concatenated fd, tmp_name = tempfile.mkstemp( dir=work_dir, prefix=f".{concatenated.name}_", suffix=".tmp", ) tmp_path = Path(tmp_name) try: with os.fdopen(fd, "wb") as out: for part in parts: with open(part, "rb") as f: shutil.copyfileobj(f, out) os.replace(tmp_path, concatenated) manifest_path.write_text(json.dumps(manifest, indent=2)) except BaseException: try: os.close(fd) except OSError: pass tmp_path.unlink(missing_ok=True) raise return concatenated def _find_split_json(self, labels_dir: Path, split: str) -> Path: aliases = { "train": ("train.json", "something-something-v2-train.json"), "validation": ("validation.json", "val.json", "something-something-v2-validation.json"), "test": ("test.json", "something-something-v2-test.json"), }[split] found = self._find_optional_file(labels_dir, aliases) if found is None: raise FileNotFoundError(f"Could not find SSv2 {split} annotation JSON under {labels_dir}") return found def _find_optional_file(self, root: Path, names: tuple[str, ...]) -> Path | None: for name in names: direct = root / name if direct.exists(): return direct matches = sorted(root.rglob(name)) if matches: return matches[0] return None def _load_label_map(self, labels_dir: Path) -> dict[str, int]: labels_file = self._find_optional_file( labels_dir, ("labels.json", "something-something-v2-labels.json"), ) if labels_file is None: return {} raw = json.loads(labels_file.read_text()) if isinstance(raw, dict): return {_normalize_template(name): int(idx) for name, idx in raw.items()} raise ValueError(f"Unsupported SSv2 labels file format: {labels_file}") def _load_annotations(self, path: Path) -> list[dict]: raw = json.loads(Path(path).read_text()) if not isinstance(raw, list): raise ValueError(f"SSv2 annotation file must contain a list: {path}") return raw def _load_test_answers(self, path: Path | None) -> dict[str, str]: if path is None: return {} answers = {} with open(path, newline="") as f: sample = f.read(4096) f.seek(0) try: dialect = csv.Sniffer().sniff(sample, delimiters=";,") reader = csv.reader(f, dialect) except csv.Error: reader = csv.reader(f, delimiter=";" if ";" in sample else ",") for row in reader: if len(row) >= 2: answers[str(row[0])] = row[1] return answers def _video_index(self, videos_dir: Path) -> dict[str, Path]: key = str(Path(videos_dir).resolve()) cached = self._video_index_cache.get(key) if cached is not None: return cached index = {} for path in self._iter_video_files(Path(videos_dir)): index[path.stem] = path self._video_index_cache[key] = index return index def _iter_video_files(self, root: Path) -> Iterator[Path]: stack = [Path(root)] while stack: current = stack.pop() try: with os.scandir(current) as entries: for entry in entries: if entry.is_dir(follow_symlinks=False): stack.append(Path(entry.path)) elif entry.is_file(follow_symlinks=False): path = Path(entry.path) if path.suffix.lower() in _VIDEO_EXTENSIONS: yield path except OSError as exc: logging.warning(f"Could not scan SSv2 video directory {current}: {exc}")
SSv2 = SomethingSomethingV2