nirs4all.pipeline.storage.workspace_store module

Database-backed workspace storage.

Replaces: ManifestManager, SimulationSaver, PipelineWriter,

PredictionStorage, ArrayRegistry, WorkspaceExporter, PredictionResolver/TargetResolver.

All metadata, configs, logs, chains, and predictions are stored in a DuckDB database. Binary artifacts (fitted models, transformers) are stored in a flat content-addressed directory alongside the database file. Human-readable files (exported bundles, CSVs, charts) are produced only by explicit export operations.

Workspace layout after migration:

workspace/
    store.duckdb                    # All structured data
    artifacts/                      # Flat content-addressed binaries
        ab/abc123def456.joblib
    exports/                        # User-triggered exports (on demand)
class nirs4all.pipeline.storage.workspace_store.WorkspaceStore(workspace_path: Path)[source]

Bases: object

Database-backed workspace storage.

Central storage facade for all workspace data: runs, pipelines, chains, predictions, prediction arrays, artifacts, and structured execution logs.

Replaces the file-hierarchy storage composed of ManifestManager, SimulationSaver, PipelineWriter, PredictionStorage, ArrayRegistry, WorkspaceExporter, and PredictionResolver.

The store manages two on-disk resources:

  • store.duckdb – a single DuckDB database holding seven tables (runs, pipelines, chains, predictions, prediction_arrays, artifacts, logs).

  • artifacts/ – a flat, content-addressed directory for binary artifacts (fitted models, transformers, etc.).

All query methods that return tabular data return polars.DataFrame via DuckDB’s zero-copy Arrow transfer, enabling efficient downstream analysis with Polars or conversion to pandas/numpy.

Parameters:

workspace_path – Root directory of the workspace. The database file store.duckdb and the artifacts/ directory are created inside this path.

Example

>>> from pathlib import Path
>>> store = WorkspaceStore(Path("./workspace"))
>>> run_id = store.begin_run("experiment_1", config={}, datasets=[])
>>> store.complete_run(run_id, summary={"total_pipelines": 5})
begin_pipeline(run_id: str, name: str, expanded_config: Any, generator_choices: list, dataset_name: str, dataset_hash: str) str[source]

Register a new pipeline execution under a run.

A pipeline represents a single expanded configuration (after generator expansion) executed on a single dataset.

Parameters:
  • run_id – Parent run identifier.

  • name – Pipeline name (e.g. "0001_pls_abc123").

  • expanded_config – The fully expanded pipeline configuration (serialisable to JSON). This is the resolved configuration after all _or_, _range_, _log_range_, and _cartesian_ generators have been applied.

  • generator_choices – List of generator choices that produced this pipeline. Each entry is a dict like {"_or_": value} or {"_range_": 18}.

  • dataset_name – Name of the dataset being processed.

  • dataset_hash – Content hash of the dataset at execution time, enabling later run-compatibility checks.

Returns:

A unique pipeline identifier (UUID-based string).

begin_run(name: str, config: Any, datasets: list[dict]) str[source]

Create a new run and return its unique identifier.

A run groups one or more pipeline executions that share a common configuration and set of target datasets. The run is created in "running" status.

Parameters:
  • name – Human-readable name for the run (e.g. "protein_sweep").

  • config – Serialisable run-level configuration (stored as JSON). Typically contains cross-validation strategy, metric, and global flags.

  • datasets – List of dataset metadata dictionaries. Each entry should contain at minimum {"name": str, "path": str}. Additional keys (hash, n_samples, n_features, y_stats) are preserved for provenance tracking.

Returns:

A unique run identifier (UUID-based string).

close() None[source]

Close the database connection and release resources.

Safe to call multiple times. After closing, all other methods will raise RuntimeError.

complete_pipeline(pipeline_id: str, best_val: float, best_test: float, metric: str, duration_ms: int) None[source]

Mark a pipeline execution as successfully completed.

Parameters:
  • pipeline_id – Identifier returned by begin_pipeline().

  • best_val – Best validation score achieved by this pipeline.

  • best_test – Corresponding test score for the best validation model.

  • metric – Name of the metric used for ranking (e.g. "rmse").

  • duration_ms – Total execution time in milliseconds.

complete_run(run_id: str, summary: dict) None[source]

Mark a run as successfully completed.

Updates the run status to "completed" and stores the provided summary (e.g. total pipelines completed, best overall score).

Parameters:
  • run_id – Identifier returned by begin_run().

  • summary – Free-form summary dictionary persisted alongside the run record.

delete_prediction(prediction_id: str) bool[source]

Delete a single prediction and its associated arrays.

Parameters:

prediction_id – Prediction to delete.

Returns:

True if the prediction existed and was deleted, False otherwise.

delete_run(run_id: str, delete_artifacts: bool = True) int[source]

Delete a run and all its descendant data.

Cascades to pipelines, chains, predictions, prediction arrays, and log entries. If delete_artifacts is True, artifact reference counts are decremented and files with zero references are removed from disk.

Parameters:
  • run_id – Run to delete.

  • delete_artifacts – Whether to remove orphaned artifact files.

Returns:

Total number of database rows deleted across all tables.

export_chain(chain_id: str, output_path: Path, format: str = 'n4a') Path[source]

Export a chain as a standalone prediction bundle.

Builds a self-contained archive from the chain’s steps and artifacts, suitable for deployment or sharing without the workspace.

Supported formats:

  • "n4a" – ZIP archive containing manifest.json, chain.json, and all referenced artifact files.

  • "n4a.py" – Portable Python script with embedded (base64-encoded) artifacts.

Parameters:
  • chain_id – Chain to export.

  • output_path – Destination file path.

  • format – Export format ("n4a" or "n4a.py").

Returns:

The resolved output path (may have an extension appended if not already present).

Raises:
export_pipeline_config(pipeline_id: str, output_path: Path) Path[source]

Export a pipeline’s expanded configuration as JSON.

The exported file contains the fully resolved pipeline configuration (after generator expansion) and can be re-used with nirs4all.run() or nirs4all.session().

Parameters:
  • pipeline_id – Pipeline to export.

  • output_path – Destination .json file path.

Returns:

The resolved output path.

Raises:

KeyError – If the pipeline does not exist.

export_predictions_parquet(output_path: Path, **filters: Any) Path[source]

Export prediction records to a Parquet file.

Writes a Polars-compatible Parquet file containing prediction metadata (no arrays). Filters use the same keyword arguments as query_predictions().

Parameters:
  • output_path – Destination .parquet file path.

  • **filters – Optional filters (dataset_name, model_class, partition, fold_id, branch_id, pipeline_id, run_id).

Returns:

The resolved output path.

export_run(run_id: str, output_path: Path) Path[source]

Export full run metadata (run + pipelines + chains) as YAML.

Produces an archival snapshot of the entire run, including all pipeline configs, chain descriptions, and summary metrics. Does not include binary artifacts or arrays.

Parameters:
  • run_id – Run to export.

  • output_path – Destination .yaml file path.

Returns:

The resolved output path.

Raises:

KeyError – If the run does not exist.

fail_pipeline(pipeline_id: str, error: str) None[source]

Mark a pipeline execution as failed and roll back its data.

Predictions, chains, and logs associated with this pipeline are removed. Artifacts whose reference count drops to zero become candidates for garbage collection.

Parameters:
  • pipeline_id – Identifier returned by begin_pipeline().

  • error – Human-readable error description.

fail_run(run_id: str, error: str) None[source]

Mark a run as failed.

Updates the run status to "failed" and records the error message. Any pipelines still in "running" status under this run should be considered implicitly failed.

Parameters:
  • run_id – Identifier returned by begin_run().

  • error – Human-readable error description or traceback excerpt.

gc_artifacts() int[source]

Garbage-collect unreferenced artifacts.

Removes artifact files from disk whose reference count has dropped to zero (i.e. no chain references them). Also removes the corresponding rows from the artifacts table.

Returns:

Number of artifact files removed.

get_artifact_path(artifact_id: str) Path[source]

Return the filesystem path of a stored artifact.

Useful when external tools need direct file access (e.g. for building a .n4a bundle ZIP).

Parameters:

artifact_id – Identifier returned by save_artifact().

Returns:

Absolute path to the artifact file.

Raises:

KeyError – If the artifact identifier is unknown.

get_chain(chain_id: str) dict | None[source]

Retrieve a chain by its identifier.

Returns:

A dictionary containing all chain fields (steps, model_step_idx, fold_artifacts, shared_artifacts, etc.), or None if the chain does not exist.

get_chains_for_pipeline(pipeline_id: str) DataFrame[source]

List all chains belonging to a pipeline.

Returns:

A polars.DataFrame with one row per chain, including columns for chain_id, model_class, preprocessings, branch_path, and source_index.

get_pipeline(pipeline_id: str) dict | None[source]

Retrieve a single pipeline record.

Returns:

A dictionary with all pipeline fields (pipeline_id, run_id, name, status, expanded_config, generator_choices, dataset_name, dataset_hash, best_val, best_test, metric, duration_ms, created_at, completed_at, error), or None if the pipeline does not exist.

get_pipeline_log(pipeline_id: str) DataFrame[source]

Retrieve all log entries for a pipeline.

Returns:

A polars.DataFrame ordered by (step_idx, timestamp) with columns step_idx, operator_class, event, duration_ms, message, details, level, timestamp.

get_prediction(prediction_id: str, load_arrays: bool = False) dict | None[source]

Retrieve a single prediction record.

Parameters:
  • prediction_id – Unique prediction identifier.

  • load_arrays – If True, the returned dictionary includes y_true, y_pred, y_proba, sample_indices, and weights as numpy.ndarray objects. If False (default), arrays are omitted for speed.

Returns:

Prediction dictionary or None if not found.

get_run(run_id: str) dict | None[source]

Retrieve a single run record.

Returns:

A dictionary with all run fields (run_id, name, status, config, datasets, summary, created_at, completed_at, error), or None if the run does not exist.

get_run_log_summary(run_id: str) DataFrame[source]

Aggregate log entries across all pipelines of a run.

Produces a summary suitable for dashboard display: per-pipeline total duration, step counts, warning/error counts.

Returns:

A polars.DataFrame with one row per pipeline in the run.

list_pipelines(run_id: str | None = None, dataset_name: str | None = None) DataFrame[source]

List pipelines with optional filtering.

Parameters:
  • run_id – Filter by parent run. None returns pipelines from all runs.

  • dataset_name – Filter by dataset name. None returns pipelines for all datasets.

Returns:

A polars.DataFrame with one row per matching pipeline, ordered by created_at descending.

list_runs(status: str | None = None, dataset: str | None = None, limit: int = 100, offset: int = 0) DataFrame[source]

List runs with optional filtering and pagination.

Parameters:
  • status – Filter by run status ("running", "completed", "failed"). None returns all statuses.

  • dataset – Filter to runs that reference this dataset name in their datasets list.

  • limit – Maximum number of rows to return.

  • offset – Number of rows to skip (for pagination).

Returns:

A polars.DataFrame with one row per matching run, ordered by created_at descending (newest first).

load_artifact(artifact_id: str) Any[source]

Load a binary artifact from disk.

Parameters:

artifact_id – Identifier returned by save_artifact().

Returns:

The deserialised Python object.

Raises:
log_step(pipeline_id: str, step_idx: int, operator_class: str, event: str, duration_ms: int | None = None, message: str | None = None, details: dict | None = None, level: str = 'info') None[source]

Record a structured log entry for a pipeline step.

Step logs enable post-hoc analysis of execution timelines, per-step durations, warnings, and errors without parsing text log files.

Parameters:
  • pipeline_id – Pipeline the step belongs to.

  • step_idx – Zero-based step index in the pipeline.

  • operator_class – Fully qualified class name of the operator executed in this step.

  • event – Event name (e.g. "start", "end", "skip", "warning", "error").

  • duration_ms – Step execution time in milliseconds (typically set on "end" events).

  • message – Optional human-readable message.

  • details – Optional structured details (stored as JSON).

  • level – Log level ("debug", "info", "warning", "error").

query_predictions(dataset_name: str | None = None, model_class: str | None = None, partition: str | None = None, fold_id: str | None = None, branch_id: int | None = None, pipeline_id: str | None = None, run_id: str | None = None, limit: int | None = None, offset: int = 0) DataFrame[source]

Query predictions with flexible filtering.

All filter arguments are optional. When multiple filters are specified, they are combined with AND semantics.

Parameters:
  • dataset_name – Filter by dataset name.

  • model_class – Filter by model class name (supports SQL LIKE patterns, e.g. "PLS%").

  • partition – Filter by partition ("train", "val", "test").

  • fold_id – Filter by fold identifier.

  • branch_id – Filter by branch index.

  • pipeline_id – Filter by parent pipeline.

  • run_id – Filter by parent run (joins through pipelines).

  • limit – Maximum number of rows. None for unlimited.

  • offset – Number of rows to skip.

Returns:

A polars.DataFrame with one row per matching prediction (arrays are not included; use get_prediction() with load_arrays=True for that).

register_existing_artifact(artifact_id: str, path: str, content_hash: str, operator_class: str, artifact_type: str, format: str, size_bytes: int) str[source]

Register an artifact that was already saved to disk.

This is used to bridge the ArtifactRegistry (which persists files during pipeline execution) with the DuckDB artifacts table so that load_artifact() and chain replay can find them.

If an artifact with the same artifact_id already exists the call is silently ignored (idempotent).

Parameters:
  • artifact_id – Artifact identifier (may be V3 format).

  • path – Relative path within the artifacts/ directory.

  • content_hash – SHA-256 hex digest of the serialised content.

  • operator_class – Class name of the operator that produced it.

  • artifact_type – Category label ("model", "transformer", …).

  • format – Serialisation format ("joblib", "cloudpickle", …).

  • size_bytes – Size of the serialised content in bytes.

Returns:

The artifact_id that was registered.

replay_chain(chain_id: str, X: ndarray, wavelengths: ndarray | None = None) ndarray[source]

Replay a stored chain on new data to produce predictions.

Loads each step’s artifact, applies the transformation in order, and for the model step loads all fold models and returns the averaged prediction.

This is the primary in-workspace prediction path. For out-of-workspace prediction, export to .n4a first.

Parameters:
  • chain_id – Chain to replay.

  • X – Input feature matrix (n_samples x n_features).

  • wavelengths – Optional wavelength array for wavelength-aware operators. Required if any step in the chain uses SpectraTransformerMixin.

Returns:

Predicted values as a 1-D numpy.ndarray of shape (n_samples,).

Raises:
save_artifact(obj: Any, operator_class: str, artifact_type: str, format: str) str[source]

Persist a binary artifact (fitted model or transformer).

The object is serialised to disk in the artifacts/ directory using content-addressed storage. If an identical artifact already exists (same content hash), the existing entry is reused and its reference count incremented.

Parameters:
  • obj – The Python object to persist (e.g. a fitted sklearn.preprocessing.StandardScaler).

  • operator_class – Fully qualified class name of the operator that produced this artifact.

  • artifact_type – Category label ("model", "transformer", "scaler", etc.).

  • format – Serialisation format hint ("joblib", "cloudpickle", "keras_h5", etc.).

Returns:

A unique artifact identifier. If the content already existed, the same identifier is returned (content-addressed deduplication).

save_chain(pipeline_id: str, steps: list[dict], model_step_idx: int, model_class: str, preprocessings: str, fold_strategy: str, fold_artifacts: dict, shared_artifacts: dict, branch_path: list[int] | None = None, source_index: int | None = None) str[source]

Store a preprocessing-to-model chain.

A chain captures the complete, ordered sequence of steps (transformers and model) that were executed during training, together with references to the fitted artifacts for each fold. Chains are the unit of export and replay.

Parameters:
  • pipeline_id – Parent pipeline identifier.

  • steps

    Ordered list of step descriptors. Each dict should contain at minimum:

    {
        "step_idx": int,
        "operator_class": str,
        "params": dict,
        "artifact_id": str | None,
        "stateless": bool,
    }
    

  • model_step_idx – Index (within steps) of the model step.

  • model_class – Fully qualified class name of the model (e.g. "sklearn.cross_decomposition.PLSRegression").

  • preprocessings – Short display string summarising the preprocessing chain (e.g. "SNV>Detr>MinMax").

  • fold_strategy – Cross-validation fold strategy identifier (e.g. "per_fold" or "shared").

  • fold_artifacts – Mapping from fold identifier (string) to the artifact ID of the model trained on that fold. Example: {"fold_0": "art_abc123", "fold_1": "art_def456"}.

  • shared_artifacts – Mapping from step index (string) to a list of artifact IDs of shared (non-fold-specific) fitted objects. Example: {"0": ["art_scaler_abc"], "3": ["art_snv", "art_sg"]}.

  • branch_path – For branching pipelines, the path of branch indices leading to this chain. None for non-branching pipelines.

  • source_index – For multi-source pipelines, the source index this chain belongs to. None for single-source pipelines.

Returns:

A unique chain identifier (UUID-based string).

save_prediction(pipeline_id: str, chain_id: str, dataset_name: str, model_name: str, model_class: str, fold_id: str, partition: str, val_score: float, test_score: float, train_score: float, metric: str, task_type: str, n_samples: int, n_features: int, scores: dict, best_params: dict, branch_id: int | None, branch_name: str | None, exclusion_count: int, exclusion_rate: float, preprocessings: str = '', prediction_id: str | None = None) str[source]

Store a single prediction record.

A prediction captures the result of evaluating one model on one fold/partition combination: the scalar scores, the model identity, the preprocessing summary, and links back to the pipeline and chain that produced it.

Arrays (y_true, y_pred, etc.) are stored separately via save_prediction_arrays().

Parameters:
  • pipeline_id – Parent pipeline identifier.

  • chain_id – Chain that produced this prediction.

  • dataset_name – Name of the dataset.

  • model_name – Short model name (e.g. "PLSRegression").

  • model_class – Fully qualified model class name.

  • fold_id – Fold identifier (e.g. "fold_0", "avg").

  • partition – Data partition ("train", "val", "test").

  • val_score – Validation score (primary ranking metric).

  • test_score – Test score (for reporting).

  • train_score – Training score (for overfitting diagnostics).

  • metric – Name of the metric (e.g. "rmse", "r2").

  • task_type"regression" or "classification".

  • n_samples – Number of samples in this partition.

  • n_features – Number of features (wavelengths).

  • scores – Nested dict of all computed scores per partition. Example: {"val": {"rmse": 0.12, "r2": 0.95}, ...}.

  • best_params – Best hyperparameters found (if tuning was used).

  • branch_id – Branch index (0-based) or None.

  • branch_name – Human-readable branch name or None.

  • exclusion_count – Number of samples excluded by outlier filters.

  • exclusion_rate – Fraction of samples excluded (0.0 – 1.0).

  • preprocessings – Short display string for the preprocessing chain applied before this model.

Returns:

A unique prediction identifier (UUID-based string).

save_prediction_arrays(prediction_id: str, y_true: ndarray | None, y_pred: ndarray | None, y_proba: ndarray | None = None, sample_indices: ndarray | None = None, weights: ndarray | None = None) None[source]

Store the dense arrays associated with a prediction.

Arrays are stored as native DOUBLE[] columns in DuckDB, enabling zero-copy Arrow transfer to Polars and efficient columnar compression.

For classification tasks, y_proba holds the class-probability matrix (shape n_samples x n_classes), flattened to 1-D for storage with the original shape recorded for reconstruction.

Parameters:
  • prediction_id – Prediction identifier returned by save_prediction().

  • y_true – Ground-truth values (1-D array).

  • y_pred – Predicted values (1-D array).

  • y_proba – Class probabilities (2-D array, flattened for storage). None for regression tasks.

  • sample_indices – Original dataset indices of the samples in this partition/fold.

  • weights – Per-sample weights, if applicable.

top_predictions(n: int, metric: str = 'val_score', ascending: bool = True, partition: str = 'val', dataset_name: str | None = None, group_by: str | None = None) DataFrame[source]

Return the top-N predictions ranked by a score column.

This is the primary ranking interface for finding the best models across a workspace.

Parameters:
  • n – Number of top predictions to return. When group_by is set, returns top n per group.

  • metric – Column to rank by ("val_score", "test_score", "train_score").

  • ascending – If True (default), lower values rank higher (appropriate for error metrics like RMSE). Set to False for higher-is-better metrics like R2.

  • partition – Only consider predictions from this partition.

  • dataset_name – Optional dataset filter.

  • group_by – Optional grouping column (e.g. "model_class", "dataset_name"). When set, the result contains top n rows per distinct value of this column.

Returns:

A polars.DataFrame with the top predictions.

vacuum() None[source]

Reclaim unused space in the DuckDB database file.

Equivalent to VACUUM in SQL. Call after large deletions to reduce the on-disk size of store.duckdb.

property workspace_path: Path

Root directory of the workspace.