Execute a ValidQuery against the indexed datasets store.
Source code in src/imgnet/query/engine.py
| def run_query(
valid_query: "ValidQuery", store: IndexedDatasets
) -> pd.DataFrame:
"""Execute a `ValidQuery` against the indexed datasets store."""
supported = store.collections
requested_file_type = valid_query.file_type
collections = valid_query.collections
modality_queries = valid_query.modalities
rules = valid_query.rules
if collections == "all":
if requested_file_type in (None, "all"):
collections = supported
else:
collections = [
c
for c in supported
if store.file_type(c) == requested_file_type
]
if isinstance(collections, str):
collections = [collections]
for collection in collections:
if collection not in supported:
msg = f"Collection {collection} not found."
raise CollectionsValidationError(msg)
if requested_file_type not in (None, "all"):
expected = (
requested_file_type.value
if isinstance(requested_file_type, FileType)
else requested_file_type
)
actual = store.file_type(collection).value
if store.file_type(collection) != requested_file_type:
msg = (
f"Collection {collection!r} is of type {actual}, "
f"but query requested {expected}."
)
raise CollectionsValidationError(msg)
if isinstance(modality_queries, str):
modality_queries = [modality_queries]
def _query_one(collection: str) -> pd.DataFrame:
file_type = store.file_type(collection)
if file_type == FileType.DICOM:
return _run_query_dicom(collection, store, modality_queries, rules)
elif file_type == FileType.NIFTI:
return _run_query_nifti(collection, store, modality_queries, rules)
else:
msg = f"Unsupported file type for collection {collection}: {file_type}"
raise ValueError(msg)
logger.info("Running query...")
if len(collections) == 1:
matches = [_query_one(collections[0])]
else:
matches = []
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(_query_one, col): col for col in collections
}
for future in as_completed(futures):
matches.append(future.result())
return pd.concat(matches, ignore_index=True)
|