nirs4all.pipeline.storage.manifest_manager module

Manifest Manager - Pipeline manifest and dataset index management

Manages pipeline manifests with sequential numbering and content-addressed artifacts. Provides centralized pipeline registration, lookup, and lifecycle management.

Architecture (v2):

workspace/ ├── binaries/<dataset>/ # Centralized artifact storage │ ├── model_PLSRegression_abc123.joblib │ └── transformer_StandardScaler_def456.pkl └── runs/<dataset>/

├── 0001_pls_abc123/ │ └── manifest.yaml # References artifacts by path ├── 0002_rf_def456/ │ └── manifest.yaml └── predictions.json # Global predictions

Manifest Schema Versions:

v1.0: Legacy format with flat artifacts list v2.0: New format with structured artifacts section and schema_version field

class nirs4all.pipeline.storage.manifest_manager.ManifestManager(results_dir: str | Path)[source]

Bases: object

Manage pipeline manifests with sequential numbering.

This class handles: - Creating new pipelines with sequential numbering (0001_hash, 0002_hash) - Saving/loading pipeline manifests - Content-addressed artifact storage

add_run_checkpoint(run_id: str, result_id: str, metadata: Dict[str, Any] | None = None) None[source]

Add a checkpoint to record completed result.

Parameters:
  • run_id – Run identifier

  • result_id – Result/pipeline identifier that completed

  • metadata – Optional additional metadata for the checkpoint

append_artifacts(pipeline_id: str, artifacts: List[dict]) None[source]

Append artifacts to a pipeline manifest.

Supports both v1 (list) and v2 (dict with items) artifact formats.

Parameters:
  • pipeline_id – Pipeline ID

  • artifacts – List of artifact metadata dictionaries

append_artifacts_v2(pipeline_id: str, records: List[Dict[str, Any]]) None[source]

Append v2 ArtifactRecords to a pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • records – List of ArtifactRecord instances or dicts

append_prediction(pipeline_id: str, prediction: dict) None[source]

Append a prediction record to pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • prediction – Prediction metadata dictionary

artifact_exists(content_hash: str) bool[source]

Check if artifact exists in storage.

Parameters:

content_hash – Content hash to check

Returns:

True if artifact exists

create_pipeline(name: str, dataset: str, pipeline_config: dict, pipeline_hash: str, metadata: dict | None = None, generator_choices: List[Dict[str, Any]] | None = None, dataset_info: Dict[str, Any] | None = None) tuple[str, Path][source]

Create new pipeline with sequential numbering.

Parameters:
  • name – Pipeline name (for human reference)

  • dataset – Dataset name

  • pipeline_config – Pipeline configuration dict

  • pipeline_hash – Hash of pipeline config (first 6 chars)

  • metadata – Optional initial metadata

  • generator_choices – List of generator choices that produced this pipeline. Each choice is a dict like {“_or_”: selected_value} or {“_range_”: 18}.

  • dataset_info – Optional dataset version info for run tracking (Phase 7). Expected format: {“path”: str, “hash”: str, “version_at_run”: int}

Returns:

Tuple of (pipeline_id, pipeline_dir) pipeline_id format: “0001_abc123” or “0001_name_abc123”

create_run_manifest(run_id: str, name: str, templates: List[Dict[str, Any]], datasets: List[Dict[str, Any]], config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None) Path[source]

Create a run-level manifest for tracking experiment sessions.

A run combines pipeline templates with datasets and generates results for every combination of expanded pipeline configurations and datasets.

Parameters:
  • run_id – Unique identifier for the run (e.g., “2026-01-09_experiment_abc123”)

  • name – Human-readable name for the run

  • templates – List of template metadata dictionaries with: - id: Template ID - name: Template name - original_template: The unexpanded template definition - expansion_count: Number of configs generated - file_path: Optional path to template file

  • datasets – List of dataset metadata dictionaries with: - name: Dataset name - path: Dataset path - hash: Dataset content hash - file_size: File size in bytes - n_samples, n_features: Dimensions - y_stats: Target variable statistics

  • config – Run configuration (cv_folds, metric, etc.)

  • metadata – Additional user metadata

Returns:

Path to the created run manifest file

Example

>>> manager = ManifestManager(results_dir)
>>> run_path = manager.create_run_manifest(
...     run_id="2026-01-09_protein_abc123",
...     name="Protein Content Optimization",
...     templates=[{
...         "id": "t1",
...         "name": "PLS Variants",
...         "original_template": [{"_or_": [...]}, ...],
...         "expansion_count": 6
...     }],
...     datasets=[{
...         "name": "Wheat",
...         "path": "/data/wheat.csv",
...         "hash": "abc123"
...     }],
...     config={"cv_folds": 5, "metric": "r2"}
... )
delete_pipeline(pipeline_id: str) None[source]

Delete pipeline directory and manifest.

Parameters:

pipeline_id – Pipeline ID to delete

extract_all_generator_choices(prediction: Dict[str, Any], instantiate: bool = False, verbose: bool = False) List[Any][source]

Extract all generator choices from a prediction’s pipeline manifest.

Similar to extract_generator_choice but returns all choices at once.

Parameters:
  • prediction – Prediction dictionary with ‘pipeline_uid’ field.

  • instantiate – If True, deserialize all choice values into Python objects. If False, return raw JSON values.

  • verbose – If True, print debug information.

Returns:

List of choice values (JSON or instantiated objects). Empty list if no choices are available.

Example

>>> manager = ManifestManager(runs_dir)
>>> top_pred = predictions.top(n=1)[0]
>>> all_choices = manager.extract_all_generator_choices(top_pred)
>>> # all_choices = ["StandardScaler", {'class': '...', 'params': {...}}]
extract_generator_choice(prediction: Dict[str, Any], choice_index: int, instantiate: bool = False, verbose: bool = False) Any | None[source]

Extract a specific generator choice from a prediction’s pipeline manifest.

Given a prediction (from predictions.top() or similar), loads the corresponding pipeline manifest and returns the generator choice at the specified index.

Generator choices are stored in the manifest’s generator_choices field, which is a list of dicts like:

[{“_or_”: “StandardScaler”}, {“_range_”: 18}, {“_or_”: {…}}]

This method allows extracting the value of a specific choice, either as the raw JSON node (for re-use in pipeline specs) or as an instantiated Python object.

Parameters:
  • prediction – Prediction dictionary with ‘pipeline_uid’ field.

  • choice_index – Index of the choice in the generator_choices list (0-based).

  • instantiate – If True, deserialize the choice value into a Python object. If False, return the raw JSON value.

  • verbose – If True, print debug information.

Returns:

  • The prediction has no pipeline_uid

  • The manifest doesn’t exist or has no generator_choices

  • The choice_index is out of range

Return type:

The choice value (JSON or instantiated object), or None if

Example

>>> manager = ManifestManager(runs_dir)
>>> top_pred = predictions.top(n=1)[0]
>>> # Get raw JSON of first choice
>>> scaler_spec = manager.extract_generator_choice(top_pred, 0)
>>> # scaler_spec = "sklearn.preprocessing._data.StandardScaler"
>>>
>>> # Get instantiated object
>>> scaler = manager.extract_generator_choice(top_pred, 0, instantiate=True)
>>> # scaler = StandardScaler()
>>>
>>> # Get second choice (e.g., model spec)
>>> model_spec = manager.extract_generator_choice(top_pred, 1)
>>> # model_spec = {'class': '...PLSRegression', 'params': {'n_components': 3}}
extract_top_preprocessings(predictions: List[Dict[str, Any]], top_k: int = 3, step_name: str = 'feature_augmentation', exclude_scalers: bool = True, verbose: bool = False) List[List[Any]][source]

Extract top K unique preprocessing pipelines from ranked predictions.

Given a list of predictions (typically from predictions.top()), extracts the preprocessing pipeline that was actually used for each prediction by parsing the display string and deserializing the transformers.

Iterates through ALL predictions until top_k unique preprocessings are found. This ensures we get the best-performing unique preprocessings even if the top predictions share the same preprocessing (e.g., different folds).

This method is designed for pipeline chaining: run pipeline 1, get top predictions, extract their preprocessings, use in pipeline 2.

Parameters:
  • predictions – List of prediction dictionaries, typically from predictions.top(n=…, rank_metric=”rmse”). Should be sorted by score (best first). Each prediction must have: - ‘preprocessings’: display string (e.g., “ExtendedMSC>Detr>MinMax”)

  • top_k – Number of unique preprocessings to extract. Will iterate through all predictions until this many unique preprocessings are found.

  • step_name – Unused, kept for backward compatibility.

  • exclude_scalers – If True, remove scaler transformers from each pipeline.

  • verbose – If True, print tracing information.

Returns:

List of up to top_k unique preprocessing pipelines. Each pipeline is a list of transformer instances ready for use in pipeline config.

Example

>>> manager = ManifestManager(runs_dir)
>>> top_preds = predictions.top(n=50, rank_metric="rmse")  # Get many predictions
>>> top_pp = manager.extract_top_preprocessings(top_preds, top_k=3)
>>> # top_pp = [[ExtendedMSC(), Detrend()], [SNV()], [MSC(), FirstDer()]]
>>> # Use in next pipeline:
>>> pipeline = [{"feature_augmentation": {"_or_": top_pp}}, ...]
get_artifact_path(content_hash: str) Path[source]

Get path for content-addressed artifact.

Parameters:

content_hash – Content hash of artifact

Returns:

2]>/<hash>

Return type:

Path to artifact in artifacts/objects/<hash[

get_artifacts_list(manifest: Dict[str, Any]) List[Dict[str, Any]][source]

Get artifacts as a flat list regardless of schema version.

Parameters:

manifest – Manifest dictionary

Returns:

List of artifact metadata dictionaries

get_latest_execution_trace(pipeline_id: str) ExecutionTrace | None[source]

Get the most recent execution trace for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

Most recent ExecutionTrace or None if none exist

get_next_pipeline_number(run_dir: Path | None = None) int[source]

Get next sequential pipeline number for workspace runs.

Counts existing pipeline directories (excludes _binaries).

Parameters:

run_dir – Run directory to count pipelines in. If None, uses results_dir.

Returns:

Next number (e.g., 1, 2, 3…)

get_pipeline_path(pipeline_id: str) Path[source]

Get the directory path for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

Path to pipeline directory

static get_schema_version(manifest: Dict[str, Any]) str[source]

Detect manifest schema version.

Parameters:

manifest – Manifest dictionary

Returns:

Schema version string (“1.0” or “2.0”)

list_all_pipelines() List[Dict[str, Any]][source]

List all pipelines in this run.

Returns:

List of pipeline info dictionaries

list_execution_traces(pipeline_id: str) List[str][source]

List all execution trace IDs for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

List of trace IDs

list_pipelines() List[str][source]

List all pipeline IDs in this run.

Returns:

List of pipeline IDs (e.g., [“0001_abc123”, “0002_def456”])

list_runs() List[Dict[str, Any]][source]

List all runs in this workspace.

Returns:

List of run summary dictionaries

load_execution_trace(pipeline_id: str, trace_id: str) ExecutionTrace | None[source]

Load a specific execution trace from the pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • trace_id – Trace ID to load

Returns:

ExecutionTrace instance or None if not found

load_manifest(pipeline_id: str) dict[source]

Load manifest YAML file.

Parameters:

pipeline_id – Pipeline ID (e.g., “0001_abc123”)

Returns:

Manifest dictionary

Raises:

FileNotFoundError – If manifest doesn’t exist

load_run_manifest(run_id: str) Dict[str, Any] | None[source]

Load a run manifest.

Parameters:

run_id – Run identifier

Returns:

Run manifest dictionary or None if not found

pipeline_exists(pipeline_id: str) bool[source]

Check if a pipeline exists.

Parameters:

pipeline_id – Pipeline ID

Returns:

True if manifest exists

save_execution_trace(pipeline_id: str, trace: ExecutionTrace) None[source]

Save an execution trace to the pipeline manifest.

Execution traces record the exact path through the pipeline that produced a prediction, enabling deterministic replay for prediction, transfer, and export.

Parameters:
  • pipeline_id – Pipeline ID

  • trace – ExecutionTrace instance to save

Note

The trace is stored in the manifest under “execution_traces” keyed by trace_id.

save_manifest(pipeline_id: str, manifest: dict) None[source]

Save manifest YAML file.

Parameters:
  • pipeline_id – Pipeline ID (e.g., “0001_abc123”)

  • manifest – Complete manifest dictionary

update_manifest(pipeline_id: str, updates: dict) None[source]

Update specific fields in a manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • updates – Dictionary of fields to update

update_run_status(run_id: str, status: str, summary_updates: Dict[str, Any] | None = None) None[source]

Update run status and optionally summary.

Parameters:
  • run_id – Run identifier

  • status – New status (queued, running, completed, failed, paused, cancelled)

  • summary_updates – Optional summary field updates

upgrade_manifest_to_v2(pipeline_id: str) None[source]

Upgrade a v1 manifest to v2 format in place.

Parameters:

pipeline_id – Pipeline ID to upgrade