Source code for stable_datasets.timeseries.freefield1010

import csv
import zipfile

from stable_datasets.schema import (
    ClassLabel,
    DatasetInfo,
    DatasetSource,
    DownloadInfo,
    Features,
    Sequence,
    Value,
    Version,
)
from stable_datasets.splits import Split, SplitGenerator
from stable_datasets.utils import BaseDatasetBuilder, bulk_download

from ._audio_utils import wav_bytes_to_series


[docs] class Freefield1010(BaseDatasetBuilder): """Freefield1010 bird-presence classification dataset.""" VERSION = Version("1.0.0") SOURCE = DatasetSource( homepage="http://machine-listening.eecs.qmul.ac.uk/bird-audio-detection-challenge/#downloads", assets={ "audio": DownloadInfo( url="https://archive.org/download/ff1010bird/ff1010bird_wav.zip", filename="ff1010bird_wav.zip", ), "metadata": DownloadInfo( url="https://ndownloader.figshare.com/files/6035814", filename="ff1010bird_metadata.csv", ), }, citation="See dataset homepage.", ) def _info(self): return DatasetInfo( description="Freefield1010 binary bird-presence audio classification dataset.", features=Features( { "series": Sequence(Sequence(Value("float32"))), "label": ClassLabel(names=["no_bird", "bird"]), "recording_id": Value("string"), "filename": Value("string"), } ), supervised_keys=("series", "label"), homepage=self.SOURCE["homepage"], citation=self.SOURCE["citation"], ) def _split_generators(self): source = self._source() audio_path, metadata_path = bulk_download( [ self._normalize_download_info(source["assets"]["audio"], asset_name="audio"), self._normalize_download_info(source["assets"]["metadata"], asset_name="metadata"), ], dest_folder=self._raw_download_dir, ) return [ SplitGenerator( name=Split.TRAIN, gen_kwargs={"audio_path": audio_path, "metadata_path": metadata_path, "split": "train"}, ) ] def _candidate_splits(self) -> list: return [Split.TRAIN] def _generate_examples(self, audio_path, metadata_path, split): del split with open(metadata_path, newline="", encoding="utf-8") as fh: rows = list(csv.DictReader(fh)) with zipfile.ZipFile(audio_path) as archive: for row in rows: recording_id = str(row.get("itemid", row.get("id", ""))) filename = f"{recording_id}.wav" member = _zip_member_by_suffix(archive, f"wav/{filename}") yield ( recording_id, { "series": wav_bytes_to_series(archive.read(member)), "label": int(row.get("hasbird", row.get("label", 0))), "recording_id": recording_id, "filename": filename, }, )
def _zip_member_by_suffix(archive: zipfile.ZipFile, suffix: str) -> str: suffix = suffix.lower() for name in archive.namelist(): if name.lower().endswith(suffix): return name raise FileNotFoundError(f"Could not find {suffix!r} in {archive.filename}")