nirs4all.pipeline.storage package

Subpackages

Submodules

Module contents

Storage and persistence layer for pipeline artifacts and results.

This module handles all I/O operations including: - Pipeline artifact storage (binary models, scalers, etc.) - Manifest and metadata management - Workspace exports and imports - Prediction resolution and saving - Pipeline template library

class nirs4all.pipeline.storage.ManifestManager(results_dir: str | Path)[source]

Bases: object

Manage pipeline manifests with sequential numbering.

This class handles: - Creating new pipelines with sequential numbering (0001_hash, 0002_hash) - Saving/loading pipeline manifests - Content-addressed artifact storage

append_artifacts(pipeline_id: str, artifacts: List[dict]) None[source]

Append artifacts to a pipeline manifest.

Supports both v1 (list) and v2 (dict with items) artifact formats.

Parameters:
  • pipeline_id – Pipeline ID

  • artifacts – List of artifact metadata dictionaries

append_artifacts_v2(pipeline_id: str, records: List[Dict[str, Any]]) None[source]

Append v2 ArtifactRecords to a pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • records – List of ArtifactRecord instances or dicts

append_prediction(pipeline_id: str, prediction: dict) None[source]

Append a prediction record to pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • prediction – Prediction metadata dictionary

artifact_exists(content_hash: str) bool[source]

Check if artifact exists in storage.

Parameters:

content_hash – Content hash to check

Returns:

True if artifact exists

create_pipeline(name: str, dataset: str, pipeline_config: dict, pipeline_hash: str, metadata: dict | None = None, generator_choices: List[Dict[str, Any]] | None = None) tuple[str, Path][source]

Create new pipeline with sequential numbering.

Parameters:
  • name – Pipeline name (for human reference)

  • dataset – Dataset name

  • pipeline_config – Pipeline configuration dict

  • pipeline_hash – Hash of pipeline config (first 6 chars)

  • metadata – Optional initial metadata

  • generator_choices – List of generator choices that produced this pipeline. Each choice is a dict like {“_or_”: selected_value} or {“_range_”: 18}.

Returns:

Tuple of (pipeline_id, pipeline_dir) pipeline_id format: “0001_abc123” or “0001_name_abc123”

delete_pipeline(pipeline_id: str) None[source]

Delete pipeline directory and manifest.

Parameters:

pipeline_id – Pipeline ID to delete

extract_all_generator_choices(prediction: Dict[str, Any], instantiate: bool = False, verbose: bool = False) List[Any][source]

Extract all generator choices from a prediction’s pipeline manifest.

Similar to extract_generator_choice but returns all choices at once.

Parameters:
  • prediction – Prediction dictionary with ‘pipeline_uid’ field.

  • instantiate – If True, deserialize all choice values into Python objects. If False, return raw JSON values.

  • verbose – If True, print debug information.

Returns:

List of choice values (JSON or instantiated objects). Empty list if no choices are available.

Example

>>> manager = ManifestManager(runs_dir)
>>> top_pred = predictions.top(n=1)[0]
>>> all_choices = manager.extract_all_generator_choices(top_pred)
>>> # all_choices = ["StandardScaler", {'class': '...', 'params': {...}}]
extract_generator_choice(prediction: Dict[str, Any], choice_index: int, instantiate: bool = False, verbose: bool = False) Any | None[source]

Extract a specific generator choice from a prediction’s pipeline manifest.

Given a prediction (from predictions.top() or similar), loads the corresponding pipeline manifest and returns the generator choice at the specified index.

Generator choices are stored in the manifest’s generator_choices field, which is a list of dicts like:

[{“_or_”: “StandardScaler”}, {“_range_”: 18}, {“_or_”: {…}}]

This method allows extracting the value of a specific choice, either as the raw JSON node (for re-use in pipeline specs) or as an instantiated Python object.

Parameters:
  • prediction – Prediction dictionary with ‘pipeline_uid’ field.

  • choice_index – Index of the choice in the generator_choices list (0-based).

  • instantiate – If True, deserialize the choice value into a Python object. If False, return the raw JSON value.

  • verbose – If True, print debug information.

Returns:

  • The prediction has no pipeline_uid

  • The manifest doesn’t exist or has no generator_choices

  • The choice_index is out of range

Return type:

The choice value (JSON or instantiated object), or None if

Example

>>> manager = ManifestManager(runs_dir)
>>> top_pred = predictions.top(n=1)[0]
>>> # Get raw JSON of first choice
>>> scaler_spec = manager.extract_generator_choice(top_pred, 0)
>>> # scaler_spec = "sklearn.preprocessing._data.StandardScaler"
>>>
>>> # Get instantiated object
>>> scaler = manager.extract_generator_choice(top_pred, 0, instantiate=True)
>>> # scaler = StandardScaler()
>>>
>>> # Get second choice (e.g., model spec)
>>> model_spec = manager.extract_generator_choice(top_pred, 1)
>>> # model_spec = {'class': '...PLSRegression', 'params': {'n_components': 3}}
extract_top_preprocessings(predictions: List[Dict[str, Any]], top_k: int = 3, step_name: str = 'feature_augmentation', exclude_scalers: bool = True, verbose: bool = False) List[List[Any]][source]

Extract top K unique preprocessing pipelines from ranked predictions.

Given a list of predictions (typically from predictions.top()), extracts the preprocessing pipeline that was actually used for each prediction by parsing the display string and deserializing the transformers.

Iterates through ALL predictions until top_k unique preprocessings are found. This ensures we get the best-performing unique preprocessings even if the top predictions share the same preprocessing (e.g., different folds).

This method is designed for pipeline chaining: run pipeline 1, get top predictions, extract their preprocessings, use in pipeline 2.

Parameters:
  • predictions – List of prediction dictionaries, typically from predictions.top(n=…, rank_metric=”rmse”). Should be sorted by score (best first). Each prediction must have: - ‘preprocessings’: display string (e.g., “ExtendedMSC>Detr>MinMax”)

  • top_k – Number of unique preprocessings to extract. Will iterate through all predictions until this many unique preprocessings are found.

  • step_name – Unused, kept for backward compatibility.

  • exclude_scalers – If True, remove scaler transformers from each pipeline.

  • verbose – If True, print tracing information.

Returns:

List of up to top_k unique preprocessing pipelines. Each pipeline is a list of transformer instances ready for use in pipeline config.

Example

>>> manager = ManifestManager(runs_dir)
>>> top_preds = predictions.top(n=50, rank_metric="rmse")  # Get many predictions
>>> top_pp = manager.extract_top_preprocessings(top_preds, top_k=3)
>>> # top_pp = [[ExtendedMSC(), Detrend()], [SNV()], [MSC(), FirstDer()]]
>>> # Use in next pipeline:
>>> pipeline = [{"feature_augmentation": {"_or_": top_pp}}, ...]
get_artifact_path(content_hash: str) Path[source]

Get path for content-addressed artifact.

Parameters:

content_hash – Content hash of artifact

Returns:

2]>/<hash>

Return type:

Path to artifact in artifacts/objects/<hash[

get_artifacts_list(manifest: Dict[str, Any]) List[Dict[str, Any]][source]

Get artifacts as a flat list regardless of schema version.

Parameters:

manifest – Manifest dictionary

Returns:

List of artifact metadata dictionaries

get_latest_execution_trace(pipeline_id: str) ExecutionTrace | None[source]

Get the most recent execution trace for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

Most recent ExecutionTrace or None if none exist

get_next_pipeline_number(run_dir: Path | None = None) int[source]

Get next sequential pipeline number for workspace runs.

Counts existing pipeline directories (excludes _binaries).

Parameters:

run_dir – Run directory to count pipelines in. If None, uses results_dir.

Returns:

Next number (e.g., 1, 2, 3…)

get_pipeline_path(pipeline_id: str) Path[source]

Get the directory path for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

Path to pipeline directory

static get_schema_version(manifest: Dict[str, Any]) str[source]

Detect manifest schema version.

Parameters:

manifest – Manifest dictionary

Returns:

Schema version string (“1.0” or “2.0”)

list_all_pipelines() List[Dict[str, Any]][source]

List all pipelines in this run.

Returns:

List of pipeline info dictionaries

list_execution_traces(pipeline_id: str) List[str][source]

List all execution trace IDs for a pipeline.

Parameters:

pipeline_id – Pipeline ID

Returns:

List of trace IDs

list_pipelines() List[str][source]

List all pipeline IDs in this run.

Returns:

List of pipeline IDs (e.g., [“0001_abc123”, “0002_def456”])

load_execution_trace(pipeline_id: str, trace_id: str) ExecutionTrace | None[source]

Load a specific execution trace from the pipeline manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • trace_id – Trace ID to load

Returns:

ExecutionTrace instance or None if not found

load_manifest(pipeline_id: str) dict[source]

Load manifest YAML file.

Parameters:

pipeline_id – Pipeline ID (e.g., “0001_abc123”)

Returns:

Manifest dictionary

Raises:

FileNotFoundError – If manifest doesn’t exist

pipeline_exists(pipeline_id: str) bool[source]

Check if a pipeline exists.

Parameters:

pipeline_id – Pipeline ID

Returns:

True if manifest exists

save_execution_trace(pipeline_id: str, trace: ExecutionTrace) None[source]

Save an execution trace to the pipeline manifest.

Execution traces record the exact path through the pipeline that produced a prediction, enabling deterministic replay for prediction, transfer, and export.

Parameters:
  • pipeline_id – Pipeline ID

  • trace – ExecutionTrace instance to save

Note

The trace is stored in the manifest under “execution_traces” keyed by trace_id.

save_manifest(pipeline_id: str, manifest: dict) None[source]

Save manifest YAML file.

Parameters:
  • pipeline_id – Pipeline ID (e.g., “0001_abc123”)

  • manifest – Complete manifest dictionary

update_manifest(pipeline_id: str, updates: dict) None[source]

Update specific fields in a manifest.

Parameters:
  • pipeline_id – Pipeline ID

  • updates – Dictionary of fields to update

upgrade_manifest_to_v2(pipeline_id: str) None[source]

Upgrade a v1 manifest to v2 format in place.

Parameters:

pipeline_id – Pipeline ID to upgrade

class nirs4all.pipeline.storage.PipelineLibrary(workspace_path: Path)[source]

Bases: object

Manages reusable pipeline templates in the workspace library.

Templates are stored in: workspace/library/{category}/{template_name}/ Each template contains: - pipeline.json: The pipeline configuration - metadata.json: Description, tags, performance metrics, etc. - README.md: Human-readable documentation

copy_from_pipeline(pipeline_dir: Path, name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, extract_metrics: bool = True) Path[source]

Copy a successful pipeline to the library as a template.

Parameters:
  • pipeline_dir – Path to pipeline directory (e.g., workspace/runs/…/0001_hash/)

  • name – Template name

  • category – Category for the template

  • description – Description of the template

  • tags – Tags for filtering

  • extract_metrics – Whether to extract metrics from manifest

Returns:

Path to saved template

delete_template(name: str, category: str | None = None) None[source]

Delete a template from the library.

Parameters:
  • name – Template name

  • category – Optional category to search in

Raises:

FileNotFoundError – If template not found

export_template(name: str, export_path: str | Path, category: str | None = None) Path[source]

Export a template to a standalone directory.

Parameters:
  • name – Template name

  • export_path – Destination directory

  • category – Optional category to search in

Returns:

Path to exported template

get_template_metadata(name: str, category: str | None = None) Dict[str, Any][source]

Get metadata for a template.

Parameters:
  • name – Template name

  • category – Optional category to search in

Returns:

Metadata dictionary

import_template(import_path: str | Path, category: str = 'general', overwrite: bool = False) Path[source]

Import a template from an external directory.

Parameters:
  • import_path – Path to template directory

  • category – Category to import into

  • overwrite – Whether to overwrite existing template

Returns:

Path to imported template in library

list_templates(category: str | None = None, tags: List[str] | None = None) List[Dict[str, Any]][source]

List all templates, optionally filtered by category and tags.

Parameters:
  • category – Optional category filter

  • tags – Optional list of tags to filter by (matches any)

Returns:

List of template metadata dictionaries

load_template(name: str, category: str | None = None) Dict[str, Any][source]

Load a pipeline template by name.

Parameters:
  • name – Template name

  • category – Optional category to search in (searches all if None)

Returns:

Pipeline configuration dictionary

Raises:

FileNotFoundError – If template not found

save_template(pipeline_config: Dict[str, Any], name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, metrics: Dict[str, float] | None = None, notes: str = '', overwrite: bool = False) Path[source]

Save a pipeline configuration as a reusable template.

Parameters:
  • pipeline_config – Pipeline configuration dictionary

  • name – Template name (will be sanitized for filesystem)

  • category – Category/folder (e.g., “preprocessing”, “modeling”, “full_pipeline”)

  • description – Short description of the template

  • tags – List of tags for filtering (e.g., [“classification”, “neural_network”])

  • metrics – Performance metrics (e.g., {“accuracy”: 0.95, “f1”: 0.93})

  • notes – Additional notes or usage instructions

  • overwrite – Whether to overwrite existing template

Returns:

Path to saved template directory

Raises:
class nirs4all.pipeline.storage.PipelineWriter(pipeline_dir: Path, save_charts: bool = True)[source]

Bases: object

Writes files within a pipeline directory.

Focused responsibility: File I/O operations for a single pipeline.

get_path() Path[source]

Get the pipeline directory path.

Returns:

Path to pipeline directory

list_files() list[str][source]

List all files in the pipeline directory.

Returns:

List of filenames

save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]

Save a text file to the pipeline directory.

Parameters:
  • filename – Name of file to create

  • content – Text content to write

  • overwrite – Whether to overwrite existing files

  • encoding – Text encoding (default: utf-8)

  • warn_on_overwrite – Whether to warn when overwriting

Returns:

Path to saved file

Raises:

FileExistsError – If file exists and overwrite=False

save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]

Save data as JSON file.

Parameters:
  • filename – Name of file (will add .json if missing)

  • data – Data to serialize as JSON

  • overwrite – Whether to overwrite existing files

  • indent – JSON indentation (None for compact)

Returns:

Path to saved file

save_output(name: str, data: bytes | str, extension: str = '.png') Path | None[source]

Save a human-readable output file (chart, report, etc.).

Parameters:
  • name – Output name (e.g., “2D_Chart”)

  • data – Binary or text data to save

  • extension – File extension (e.g., “.png”, “.csv”, “.txt”)

Returns:

Path to saved file, or None if save_charts=False

nirs4all.pipeline.storage.PredictionResolver

alias of TargetResolver

class nirs4all.pipeline.storage.SimulationSaver(base_path: str | Path | None = None, save_artifacts: bool = True, save_charts: bool = True)[source]

Bases: object

Manages saving simulation results with flat pipeline structure.

Acts as a facade coordinating: - PipelineWriter: File I/O within pipeline directories - WorkspaceExporter: Exporting best results - PredictionResolver: Resolving prediction targets

Works with ManifestManager to create: base_path/NNNN_hash/files

cleanup(confirm: bool = False) None[source]

Remove the current simulation directory and all its contents.

Parameters:

confirm – Must be True to actually delete files

Raises:

RuntimeError – If not registered or confirm is False

export_best_for_dataset(dataset_name: str, workspace_path: Path, runs_dir: Path, mode: str = 'predictions') Path | None[source]

Export best results for a dataset to exports/ folder.

Delegates to WorkspaceExporter.

Creates exports/{dataset_name}/ with best predictions, pipeline config, and charts. Files are renamed to include run date for tracking.

Parameters:
  • dataset_name – Dataset name (matches global prediction JSON filename)

  • workspace_path – Workspace root path (unused, maintained for compatibility)

  • runs_dir – Runs directory path

  • mode – Export mode - “predictions”, “template”, “trained”, or “full”

Returns:

Path to export directory, or None if no predictions found

export_best_prediction(predictions_file: Path, exports_dir: Path, dataset_name: str, run_date: str, pipeline_id: str, custom_name: str = None) Path[source]

Export predictions CSV to best_predictions/ folder with optional custom name.

Delegates to WorkspaceExporter.

Parameters:
  • predictions_file – Path to predictions.csv

  • exports_dir – Workspace exports directory (unused, maintained for compatibility)

  • dataset_name – Metadata for naming

  • run_date – Metadata for naming

  • pipeline_id – Metadata for naming

  • custom_name – Optional custom name for export

Returns: Path to exported CSV

export_pipeline_full(pipeline_dir: Path, exports_dir: Path, dataset_name: str, run_date: str, custom_name: str = None) Path[source]

Export full pipeline results to flat structure with optional custom name.

Delegates to WorkspaceExporter.

Parameters:
  • pipeline_dir – Path to pipeline (NNNN_hash/ or NNNN_pipelinename_hash/)

  • exports_dir – Workspace exports directory (unused, maintained for compatibility)

  • dataset_name – Dataset name

  • run_date – Run date (YYYYMMDD)

  • custom_name – Optional custom name for export

Returns: Path to exported directory

property exporter: WorkspaceExporter

Get or create WorkspaceExporter instance.

get_metadata() Dict[str, Any][source]

Get the current metadata.

get_path() Path[source]

Get the current pipeline path.

Delegates to PipelineWriter.

get_predict_targets(prediction_obj: Dict[str, Any] | str)[source]

Get target variable names for prediction from a prediction object.

Delegates to PredictionResolver.

list_files() Dict[str, List[str]][source]

List all saved files in the current pipeline.

Returns:

Dictionary with file lists

persist_artifact(step_number: int, name: str, obj: Any, format_hint: str | None = None, branch_id: int | None = None, branch_name: str | None = None) Dict[str, Any][source]

Persist artifact using the serializer with content-addressed storage.

NOTE: This is for internal binary artifacts (models, transformers, etc.) For human-readable outputs (charts, reports), use save_output() instead.

Parameters:
  • step_number – Pipeline step number

  • name – Artifact name (for reference)

  • obj – Object to persist

  • format_hint – Optional format hint for serializer

  • branch_id – Optional branch ID for pipeline branching

  • branch_name – Optional human-readable branch name

Returns:

Artifact metadata dictionary (empty if save_artifacts=False)

register(pipeline_id: str) Path[source]

Register a pipeline ID and set current directory.

Parameters:

pipeline_id – Pipeline ID from ManifestManager (e.g., “0001_abc123”)

Returns:

Path to the pipeline directory

register_workspace(workspace_root: Path, dataset_name: str, pipeline_hash: str, run_name: str = None, pipeline_name: str = None) Path[source]

Register pipeline in workspace structure with optional custom names.

Creates: - Without custom names: workspace_root/runs/{dataset}/NNNN_{hash}/ - With run_name: workspace_root/runs/{dataset}_{runname}/NNNN_{hash}/ - With pipeline_name: workspace_root/runs/{dataset}/NNNN_{pipelinename}_{hash}/ - With both: workspace_root/runs/{dataset}_{runname}/NNNN_{pipelinename}_{hash}/

All pipelines for a dataset are stored in the same folder regardless of date.

Returns:

Full path to pipeline directory

property resolver: TargetResolver

Get or create PredictionResolver instance.

save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]

Save a text file to the pipeline directory.

Delegates to PipelineWriter.

save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]

Save data as JSON file.

Delegates to PipelineWriter.

save_output(step_number: int, name: str, data: bytes | str, extension: str = '.png') Path | None[source]

Save a human-readable output file (chart, report, etc.).

Delegates to PipelineWriter.

Parameters:
  • step_number – Pipeline step number (unused, kept for compatibility)

  • name – Output name (e.g., “2D_Chart”)

  • data – Binary or text data to save

  • extension – File extension (e.g., “.png”, “.csv”, “.txt”)

Returns:

Path to saved file, or None if save_charts=False

property writer: PipelineWriter

Get or create PipelineWriter instance.

class nirs4all.pipeline.storage.TargetResolver(workspace_path: Path)[source]

Bases: object

Resolves prediction targets for predict mode.

Focused responsibility: Finding and resolving prediction targets by ID.

Note: For the comprehensive Phase 3 resolver that handles multiple source types (prediction dict, folder, Run, artifact_id, bundle), see nirs4all.pipeline.resolver.PredictionResolver.

find_best_for_config(config_path: str) Dict[str, Any] | None[source]

Find the best prediction for a given config path.

Parameters:

config_path – Path to pipeline configuration

Returns:

Best prediction metadata, or None if not found

find_prediction_by_id(prediction_id: str) Dict[str, Any] | None[source]

Search for a prediction by ID in global predictions databases.

Uses direct ID filtering for O(1) lookup per file instead of O(N) iteration.

Parameters:

prediction_id – Unique prediction identifier

Returns:

Prediction metadata dict, or None if not found

resolve_target(prediction_obj: Dict[str, Any] | str) tuple[str, Dict[str, Any] | None][source]

Resolve prediction object to config path and model metadata.

Parameters:

prediction_obj – Either: - Dict with ‘config_path’ and optional model metadata - String: config path or prediction ID

Returns:

Tuple of (config_path, target_model_metadata)

Raises:

ValueError – If prediction ID not found or invalid input

class nirs4all.pipeline.storage.WorkspaceExporter(workspace_path: Path)[source]

Bases: object

Exports best results to workspace exports/ folder.

Focused responsibility: Export functionality for best pipelines.

export_best_for_dataset(dataset_name: str, runs_dir: Path, mode: str = 'predictions') Path | None[source]

Export best results for a dataset to exports/ folder.

Parameters:
  • dataset_name – Dataset name

  • runs_dir – Runs directory path

  • mode – Export mode - “predictions”, “template”, “trained”, or “full”

Returns:

Path to export directory, or None if no predictions found

export_best_prediction(predictions_file: Path, dataset_name: str, run_date: str = None, pipeline_id: str = None, custom_name: str | None = None) Path[source]

Export predictions CSV to best_predictions/ folder.

Parameters:
  • predictions_file – Path to predictions.csv

  • dataset_name – Dataset name

  • run_date – Run date - deprecated, kept for compatibility

  • pipeline_id – Pipeline identifier

  • custom_name – Optional custom name for export

Returns:

Path to exported CSV

export_pipeline_full(pipeline_dir: Path, dataset_name: str, run_date: str = None, custom_name: str | None = None) Path[source]

Export full pipeline results to flat structure.

Parameters:
  • pipeline_dir – Path to pipeline (NNNN_hash/)

  • dataset_name – Dataset name

  • run_date – Run date (YYYYMMDD) - deprecated, kept for compatibility

  • custom_name – Optional custom name for export

Returns:

Path to exported directory