Skip to content

Store

store #

IndexedDatasets #

IndexedDatasets(path: pathlib.Path | str | None = None, force_download: bool = False)

Read-only accessor for the indexed_datasets directory.

Parameters:

Name Type Description Default

path #

pathlib.Path | str | None

Path to the indexed_datasets directory If not provided, the latest release from Hugging Face will be downloaded.

None

force_download #

bool

If true, the indexed datasets will be downloaded even if they already exist.

False
Source code in src/imgnet/collections/store.py
def __init__(
    self, path: Path | str | None = None, force_download: bool = False
) -> None:
    if path is None:
        path = _default_indexed_datasets_path()

    path = Path(path)
    logger.info(f"Indexed datasets path: {path.resolve()}")

    if not path.exists() or force_download:
        from huggingface_hub import list_repo_commits

        repo_id = "bhklab2026/med-image-index"
        latest_commit = list_repo_commits(
            repo_id=repo_id, repo_type="dataset"
        )[0].title
        logger.warning(
            "Indexed datasets not found at %s or force_download is True. "
            "Downloading latest release from Hugging Face, for repo %s. Latest commit: %s",
            path.resolve(),
            repo_id,
            latest_commit,
        )

        if path.exists():
            shutil.rmtree(path)
            logger.warning(
                "Deleted existing indexed datasets directory at %s.",
                path.resolve(),
            )

        download_dir = path.parent
        download_dir.mkdir(parents=True, exist_ok=True)

        downloader = HuggingFaceDownloader(repo_id)
        downloader.download(
            output_path=download_dir,
            ignore_patterns=[".git*"],
            force_download=True,
        )

    self.path = path
    self._collection_cache: dict[str, "Collection"] = {}

collections property #

collections: list[str]

Collection names derived from subdirectories of .imgtools/.

get_collection #

get_collection(name: str) -> imgnet.collections.store.Collection

Return a cached Collection for the given name. Validates that the collection exists.

Source code in src/imgnet/collections/store.py
def get_collection(self, name: str) -> "Collection":
    """Return a cached Collection for the given name. Validates that the collection exists."""
    if name not in self.collections:
        error_message = _unknown_collection_message(self.collections, name)
        logger.error(error_message)
        raise ValueError(error_message)
    if name not in self._collection_cache:
        self._collection_cache[name] = Collection(
            name=name, path=self.imgtools_path / name
        )
    return self._collection_cache[name]

crawl_db #

crawl_db(collection: str) -> dict

Return the parsed crawl_db.json for collection.

Source code in src/imgnet/collections/store.py
def crawl_db(self, collection: str) -> dict:
    """Return the parsed ``crawl_db.json`` for *collection*."""
    return self.get_collection(collection).crawl_db

index #

index(collection: str) -> pandas.DataFrame

Return the index.csv for collection as a DataFrame.

Source code in src/imgnet/collections/store.py
def index(self, collection: str) -> pd.DataFrame:
    """Return the ``index.csv`` for *collection* as a DataFrame."""
    return self.get_collection(collection).index

source_config #

source_config(collection: str) -> imgnet.collections.source.SourceConfig

Return the validated source.json for collection.

Falls back to TCIASource() (DICOM/TCIA defaults) when no source.json exists, keeping backwards compatibility with collections that predate this file.

Source code in src/imgnet/collections/store.py
def source_config(self, collection: str) -> SourceConfig:
    """Return the validated ``source.json`` for *collection*.

    Falls back to ``TCIASource()`` (DICOM/TCIA defaults) when no
    ``source.json`` exists, keeping backwards compatibility with
    collections that predate this file.
    """
    return self.get_collection(collection).source_config

file_type #

file_type(collection: str) -> imgnet.collections.source.FileType

Return the FileType for collection.

Source code in src/imgnet/collections/store.py
def file_type(self, collection: str) -> FileType:
    """Return the ``FileType`` for *collection*."""
    return self.get_collection(collection).file_type

collection_size #

collection_size(collection: str) -> float

Return the size of collection in GB.

Source code in src/imgnet/collections/store.py
def collection_size(self, collection: str) -> float:
    """Return the size of *collection* in GB."""
    return self.get_collection(collection).collection_size

description #

description(collection: str) -> str

Return the description of collection.

Source code in src/imgnet/collections/store.py
def description(self, collection: str) -> str:
    """Return the description of *collection*."""
    return self.get_collection(collection).description

supported_query_tags #

supported_query_tags(collection: str) -> dict[str, list[str]]

Return supported query tags per modality for collection.

Source code in src/imgnet/collections/store.py
def supported_query_tags(self, collection: str) -> dict[str, list[str]]:
    """Return supported query tags per modality for *collection*."""
    return self.get_collection(collection).supported_query_tags

downloader #

downloader(collection: str) -> imgnet.download.base.BaseDownloader

Return the downloader for collection.

Source code in src/imgnet/collections/store.py
def downloader(self, collection: str) -> BaseDownloader:
    """Return the downloader for *collection*."""
    return self.get_collection(collection).downloader

summary #

summary(update: bool = False) -> dict

Parsed collections_summary.json, or None if it doesn't exist.

Source code in src/imgnet/collections/store.py
def summary(self, update: bool = False) -> dict:
    """Parsed ``collections_summary.json``, or ``None`` if it doesn't exist."""
    if not self.summary_path.exists() or update:
        logger.info(
            "Collections summary not found or update is True. Building new summary."
        )
        collection_db = self._build_collection_db()
        with self.summary_path.open("wb") as f:
            f.write(orjson.dumps(collection_db))
        return collection_db
    logger.info("Loading collections summary from %s.", self.summary_path)
    return orjson.loads(self.summary_path.read_bytes())

Collection #

Collection(name: str, path: pathlib.Path)
Source code in src/imgnet/collections/store.py
def __init__(self, name: str, path: Path) -> None:
    self.name = name
    self.path = path
    self.indexed_datasets_path = path.parent.parent

source_config cached property #

source_config: imgnet.collections.source.SourceConfig

Return the validated source config. Falls back to TCIASource() when source.json is missing.

description cached property #

description: str

Return the description of collection.

build_summary_entry #

build_summary_entry() -> dict

Build the summary dict for this collection (Modalities, BodyPartsExamined, Images, Size, etc.).

Source code in src/imgnet/collections/store.py
def build_summary_entry(self) -> dict:
    """Build the summary dict for this collection (Modalities, BodyPartsExamined, Images, Size, etc.)."""
    summary = {
        "Modalities": set(),
        "BodyPartsExamined": set(),
        "Images": len(self.index),
        "Size": self.collection_size,
        "File Type": self.file_type.value.upper(),
        "Source": self.source_config.source.upper(),
    }

    if self.file_type == FileType.NIFTI:
        index = self.index
        if "Modality" in index.columns:
            summary["Modalities"] = (
                index["Modality"].dropna().unique().tolist()
            )
        else:
            summary["Modalities"] = []
        if "BodyPartExamined" in index.columns:
            summary["BodyPartsExamined"] = (
                index["BodyPartExamined"].dropna().unique().tolist()
            )
        else:
            summary["BodyPartsExamined"] = []
    elif self.file_type == FileType.DICOM:
        crawl_json = self.crawl_db
        for _, value in crawl_json.items():
            series = value[next(iter(value))]
            if series.get("Modality"):
                summary["Modalities"].add(series["Modality"])  # type: ignore
            if series.get("BodyPartExamined"):
                summary["BodyPartsExamined"].add(  # type: ignore
                    series["BodyPartExamined"]
                )
        for key, value in summary.items():
            if isinstance(value, set):
                summary[key] = list(value)
    return summary