Skip to content

Index writer

index_writer #

Functions:

Name Description
generate_context

Create fake metadata for the ith file.

write_entry

Each parallel worker writes a unique path and context to the index.

IndexReadError #

IndexReadError(
    index_path: pathlib.Path, original_exception: Exception
)

Bases: imgtools.io.writers.index_writer.IndexWriterError

Raised when reading the index file fails unexpectedly.

Use this when an exception occurs while attempting to read or parse the existing CSV file.

Source code in src/imgtools/io/writers/index_writer.py
def __init__(
    self, index_path: Path, original_exception: Exception
) -> None:
    self.index_path = index_path
    self.original_exception = original_exception
    msg = f"Failed to read index file '{index_path}': {original_exception}"
    super().__init__(msg)

IndexSchemaMismatchError #

IndexSchemaMismatchError(
    missing_fields: set[str], index_path: pathlib.Path
)

Bases: imgtools.io.writers.index_writer.IndexWriterError

Raised when the index file schema is missing required fields and merging columns is disabled.

Use this error to notify the caller that the existing index cannot accommodate the current row’s structure and merging is not allowed.

Source code in src/imgtools/io/writers/index_writer.py
def __init__(self, missing_fields: set[str], index_path: Path) -> None:
    self.missing_fields = missing_fields
    self.index_path = index_path
    msg = (
        f"Schema mismatch in index file '{index_path}'. "
        f"Missing fields: {sorted(missing_fields)}. "
        "Set merge_columns=True to allow schema evolution."
    )
    super().__init__(msg)

IndexWriteError #

IndexWriteError(
    index_path: pathlib.Path, original_exception: Exception
)

Bases: imgtools.io.writers.index_writer.IndexWriterError

Raised when writing to the index file fails unexpectedly.

Use this when a CSV write operation fails during append or full rewrite of the index.

Source code in src/imgtools/io/writers/index_writer.py
def __init__(
    self, index_path: Path, original_exception: Exception
) -> None:
    self.index_path = index_path
    self.original_exception = original_exception
    msg = f"Failed to write to index file '{index_path}': {original_exception}"
    super().__init__(msg)

IndexWriter #

IndexWriter(
    index_path: pathlib.Path,
    lock_path: pathlib.Path | None = None,
)

Handles safe and smart updates to a shared CSV file used as an index.

This class manages writing entries to a CSV index while avoiding problems like file corruption (from two writers editing at once), column mismatches, or missing data.

Think of this like a notebook where many writers might want to write down their output paths and metadata. This class is the referee: it waits for its turn (locking), makes sure the notebook has the right columns, and writes everything in order.

index_path : Path Path to the CSV file that acts as a shared index. lock_path : Path | None, optional Path to a .lock file that ensures one writer updates at a time. If None, uses the index file path with .lock added.

Methods:

Name Description
write_entry

Write one entry to the index file. Safe in parallel with full lock.

Source code in src/imgtools/io/writers/index_writer.py
def __init__(
    self, index_path: Path, lock_path: Path | None = None
) -> None:
    """
    Parameters
    ----------
    index_path : Path
        Path to the CSV file that acts as a shared index.
    lock_path : Path | None, optional
        Path to a `.lock` file that ensures one writer updates at a time.
            If None, uses the index file path with `.lock` added.
    """
    self.index_path: Path = index_path
    self.lock_path: Path = lock_path or index_path.with_suffix(
        index_path.suffix + ".lock"
    )

write_entry #

write_entry(
    path: pathlib.Path,
    context: dict[str, typing.Any],
    filepath_column: str = "path",
    replace_existing: bool = False,
    merge_columns: bool = True,
) -> None

Write one entry to the index file. Safe in parallel with full lock.

You give this a path and a dictionary of info. → It checks the index file. → If the path is already in there and you want to replace it, it does. → If your new info has different keys, it adds new columns (if allowed). → Then it saves the full table back to disk, safely.

Parameters:

Name Type Description Default
path #
pathlib.Path

The file path that you want to record in the index.

required
context #
dict[str, typing.Any]

Extra metadata (e.g. subject ID, date, label) to log alongside the path.

required
filepath_column #
str

Name of the column to store the file path. Default is "path".

"path"
replace_existing #
bool

If True, update the row if one with the same path already exists.

False
merge_columns #
bool

If True, automatically add new columns if the context has fields the CSV didn't have yet.

True
Source code in src/imgtools/io/writers/index_writer.py
def write_entry(
    self,
    path: Path,
    context: dict[str, Any],
    filepath_column: str = "path",
    replace_existing: bool = False,
    merge_columns: bool = True,
) -> None:
    """Write one entry to the index file. Safe in parallel with full lock.

    You give this a path and a dictionary of info.
    → It checks the index file.
    → If the path is already in there and you want to replace it, it does.
    → If your new info has different keys, it adds new columns (if allowed).
    → Then it saves the full table back to disk, safely.

    Parameters
    ----------
    path : Path
        The file path that you want to record in the index.
    context : dict[str, Any]
        Extra metadata (e.g. subject ID, date, label) to log alongside the path.
    filepath_column : str, default="path"
        Name of the column to store the file path. Default is "path".
    replace_existing : bool, default=False
        If True, update the row if one with the same path already exists.
    merge_columns : bool, default=True
        If True, automatically add new columns if the context has fields the
            CSV didn't have yet.

    Raises
    ------
    IndexSchemaMismatchError
        If the new entry's schema doesn't match the existing one and
        merging is not allowed.
    IndexReadError
        If there are issues reading the existing index file.
    IndexWriteError
        If there are issues writing to the index file.
    """
    entry = {
        filepath_column: str(path),
        **{k: str(v) for k, v in context.items()},
    }

    with InterProcessLock(self.lock_path):
        try:
            existing_rows, existing_fieldnames = self._read_existing_rows(
                filepath_column, replace_existing, entry
            )
        except OSError as e:
            raise IndexReadError(self.index_path, e) from e

        try:
            final_fieldnames = self._validate_or_merge_schema(
                existing_fieldnames, set(entry.keys()), merge_columns
            )
        except IndexSchemaMismatchError:
            raise

        all_rows = self._normalize_rows(
            existing_rows + [entry], final_fieldnames
        )

        try:
            self._write_rows(all_rows, final_fieldnames)
        except Exception as e:
            raise IndexWriteError(self.index_path, e) from e

IndexWriterError #

Bases: Exception

Base exception for all IndexWriter-related errors.

This should be used to catch any general IndexWriter failure that does not fall under a more specific error type.

generate_context #

generate_context(i: int) -> dict[str, typing.Any]

Create fake metadata for the ith file.

Source code in src/imgtools/io/writers/index_writer.py
def generate_context(i: int) -> dict[str, Any]:
    """Create fake metadata for the ith file."""
    return {
        "subject_id": f"subject_{i % 10}",
        "modality": random.choice(["CT", "MR", "SEG"]),
        "timestamp": datetime.now().isoformat(),
        "quality_score": round(random.uniform(0, 1), 3),
    }

write_entry #

write_entry(i: int) -> None

Each parallel worker writes a unique path and context to the index.

Source code in src/imgtools/io/writers/index_writer.py
def write_entry(i: int) -> None:
    """Each parallel worker writes a unique path and context to the index."""
    output_path = Path(f"output/fake_file_{i}.nii.gz")
    context = generate_context(i)

    # Use a new IndexWriter per process to avoid shared state
    local_writer = IndexWriter(index_path=INDEX_PATH)
    local_writer.write_entry(path=output_path, context=context)