Skip to content

Engine

engine #

run_query #

run_query(valid_query: 'ValidQuery', store: imgnet.collections.store.IndexedDatasets) -> pandas.DataFrame

Execute a ValidQuery against the indexed datasets store.

Source code in src/imgnet/query/engine.py
def run_query(
    valid_query: "ValidQuery", store: IndexedDatasets
) -> pd.DataFrame:
    """Execute a `ValidQuery` against the indexed datasets store."""

    supported = store.collections

    requested_file_type = valid_query.file_type

    collections = valid_query.collections
    modality_queries = valid_query.modalities
    rules = valid_query.rules

    if collections == "all":
        if requested_file_type in (None, "all"):
            collections = supported
        else:
            collections = [
                c
                for c in supported
                if store.file_type(c) == requested_file_type
            ]
    if isinstance(collections, str):
        collections = [collections]
    for collection in collections:
        if collection not in supported:
            msg = f"Collection {collection} not found."
            raise CollectionsValidationError(msg)
        if requested_file_type not in (None, "all"):
            expected = (
                requested_file_type.value
                if isinstance(requested_file_type, FileType)
                else requested_file_type
            )
            actual = store.file_type(collection).value
            if store.file_type(collection) != requested_file_type:
                msg = (
                    f"Collection {collection!r} is of type {actual}, "
                    f"but query requested {expected}."
                )
                raise CollectionsValidationError(msg)

    if isinstance(modality_queries, str):
        modality_queries = [modality_queries]

    def _query_one(collection: str) -> pd.DataFrame:
        file_type = store.file_type(collection)
        if file_type == FileType.DICOM:
            return _run_query_dicom(collection, store, modality_queries, rules)
        elif file_type == FileType.NIFTI:
            return _run_query_nifti(collection, store, modality_queries, rules)
        else:
            msg = f"Unsupported file type for collection {collection}: {file_type}"
            raise ValueError(msg)

    logger.info("Running query...")

    if len(collections) == 1:
        matches = [_query_one(collections[0])]
    else:
        matches = []
        with ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(_query_one, col): col for col in collections
            }
            for future in as_completed(futures):
                matches.append(future.result())

    return pd.concat(matches, ignore_index=True)