nirs4all.pipeline.storage package
Subpackages
- nirs4all.pipeline.storage.artifacts package
- Submodules
- nirs4all.pipeline.storage.artifacts.artifact_loader module
- nirs4all.pipeline.storage.artifacts.artifact_persistence module
- nirs4all.pipeline.storage.artifacts.artifact_registry module
- nirs4all.pipeline.storage.artifacts.operator_chain module
- nirs4all.pipeline.storage.artifacts.types module
- nirs4all.pipeline.storage.artifacts.utils module
- Module contents
ArtifactLoaderArtifactLoader.workspaceArtifactLoader.datasetArtifactLoader.binaries_dirArtifactLoader.results_dirArtifactLoader.DEFAULT_CACHE_SIZEArtifactLoader.clear_cache()ArtifactLoader.find_artifact_by_custom_name()ArtifactLoader.from_manifest()ArtifactLoader.get_all_records()ArtifactLoader.get_artifacts_by_chain_filter()ArtifactLoader.get_cache_info()ArtifactLoader.get_record()ArtifactLoader.get_record_by_chain()ArtifactLoader.get_step_binaries()ArtifactLoader.get_step_binaries_by_artifact_ids()ArtifactLoader.has_binaries_for_step()ArtifactLoader.import_from_manifest()ArtifactLoader.load_by_artifact_id()ArtifactLoader.load_by_chain()ArtifactLoader.load_by_chain_prefix()ArtifactLoader.load_by_id()ArtifactLoader.load_fold_models()ArtifactLoader.load_for_step()ArtifactLoader.load_meta_model_for_prediction()ArtifactLoader.load_meta_model_with_sources()ArtifactLoader.load_with_dependencies()ArtifactLoader.preload_artifacts()ArtifactLoader.set_cache_size()
ArtifactRecordArtifactRecord.artifact_idArtifactRecord.content_hashArtifactRecord.pathArtifactRecord.chain_pathArtifactRecord.source_indexArtifactRecord.pipeline_idArtifactRecord.branch_pathArtifactRecord.step_indexArtifactRecord.substep_indexArtifactRecord.fold_idArtifactRecord.artifact_typeArtifactRecord.class_nameArtifactRecord.custom_nameArtifactRecord.depends_onArtifactRecord.formatArtifactRecord.format_versionArtifactRecord.nirs4all_versionArtifactRecord.size_bytesArtifactRecord.created_atArtifactRecord.paramsArtifactRecord.meta_configArtifactRecord.versionArtifactRecord.artifact_idArtifactRecord.artifact_typeArtifactRecord.branch_pathArtifactRecord.chain_hashArtifactRecord.chain_pathArtifactRecord.class_nameArtifactRecord.content_hashArtifactRecord.created_atArtifactRecord.custom_nameArtifactRecord.depends_onArtifactRecord.fold_idArtifactRecord.formatArtifactRecord.format_versionArtifactRecord.from_dict()ArtifactRecord.get_branch_path_str()ArtifactRecord.get_fold_str()ArtifactRecord.is_branch_specificArtifactRecord.is_fold_specificArtifactRecord.is_meta_modelArtifactRecord.is_source_specificArtifactRecord.matches_context()ArtifactRecord.meta_configArtifactRecord.nirs4all_versionArtifactRecord.paramsArtifactRecord.pathArtifactRecord.pipeline_idArtifactRecord.short_hashArtifactRecord.size_bytesArtifactRecord.source_indexArtifactRecord.step_indexArtifactRecord.substep_indexArtifactRecord.to_dict()ArtifactRecord.version
ArtifactRegistryArtifactRegistry.workspaceArtifactRegistry.datasetArtifactRegistry.binaries_dirArtifactRegistry.dependency_graphArtifactRegistry.pipeline_idArtifactRegistry.cleanup_failed_run()ArtifactRegistry.delete_orphaned_artifacts()ArtifactRegistry.delete_pipeline_artifacts()ArtifactRegistry.end_run()ArtifactRegistry.export_to_manifest()ArtifactRegistry.find_orphaned_artifacts()ArtifactRegistry.generate_id()ArtifactRegistry.get_all_records()ArtifactRegistry.get_artifacts_for_step()ArtifactRegistry.get_by_chain()ArtifactRegistry.get_chain_prefix()ArtifactRegistry.get_dependencies()ArtifactRegistry.get_fold_models()ArtifactRegistry.get_stats()ArtifactRegistry.import_from_manifest()ArtifactRegistry.load_artifact()ArtifactRegistry.purge_dataset_artifacts()ArtifactRegistry.register()ArtifactRegistry.register_meta_model()ArtifactRegistry.register_with_chain()ArtifactRegistry.resolve()ArtifactRegistry.resolve_by_hash()ArtifactRegistry.resolve_dependencies()ArtifactRegistry.start_run()
ArtifactTypeDependencyGraphExecutionPathExecutionPath.pipeline_idExecutionPath.chain_pathExecutionPath.branch_pathExecutionPath.step_indexExecutionPath.source_indexExecutionPath.fold_idExecutionPath.substep_indexExecutionPath.branch_pathExecutionPath.chain_pathExecutionPath.fold_idExecutionPath.from_artifact_id_v3()ExecutionPath.pipeline_idExecutionPath.source_indexExecutionPath.step_indexExecutionPath.substep_indexExecutionPath.to_artifact_id()
MetaModelConfigOperatorChainOperatorChain.nodesOperatorChain.pipeline_idOperatorChain.append()OperatorChain.copy()OperatorChain.extend()OperatorChain.filter_branch()OperatorChain.filter_source()OperatorChain.filter_step()OperatorChain.from_dict()OperatorChain.from_path()OperatorChain.get_branch_path()OperatorChain.get_last_node()OperatorChain.get_nodes_at_step()OperatorChain.is_empty()OperatorChain.merge_with_prefix()OperatorChain.nodesOperatorChain.pipeline_idOperatorChain.remap_steps()OperatorChain.to_dict()OperatorChain.to_hash()OperatorChain.to_path()OperatorChain.with_pipeline_id()
OperatorNodeOperatorNode.step_indexOperatorNode.operator_classOperatorNode.branch_pathOperatorNode.source_indexOperatorNode.fold_idOperatorNode.substep_indexOperatorNode.operator_nameOperatorNode.branch_pathOperatorNode.fold_idOperatorNode.from_dict()OperatorNode.from_key()OperatorNode.matches_context()OperatorNode.operator_classOperatorNode.operator_nameOperatorNode.source_indexOperatorNode.step_indexOperatorNode.substep_indexOperatorNode.to_dict()OperatorNode.to_key()OperatorNode.with_fold()OperatorNode.with_source()
artifact_id_matches_context()compute_chain_hash()compute_content_hash()extract_fold_id_from_artifact_id()extract_pipeline_id_from_artifact_id()generate_artifact_id_v3()generate_filename()get_binaries_path()get_short_hash()is_v3_artifact_id()parse_artifact_id()parse_artifact_id_v3()parse_filename()validate_artifact_id()
- Submodules
Submodules
- nirs4all.pipeline.storage.io module
SimulationSaverSimulationSaver.cleanup()SimulationSaver.export_best_for_dataset()SimulationSaver.export_best_prediction()SimulationSaver.export_pipeline_full()SimulationSaver.exporterSimulationSaver.get_metadata()SimulationSaver.get_path()SimulationSaver.get_predict_targets()SimulationSaver.list_files()SimulationSaver.persist_artifact()SimulationSaver.register()SimulationSaver.register_workspace()SimulationSaver.resolverSimulationSaver.save_file()SimulationSaver.save_json()SimulationSaver.save_output()SimulationSaver.writer
- nirs4all.pipeline.storage.io_exporter module
- nirs4all.pipeline.storage.io_resolver module
- nirs4all.pipeline.storage.io_writer module
- nirs4all.pipeline.storage.library module
- nirs4all.pipeline.storage.manifest_manager module
ManifestManagerManifestManager.add_run_checkpoint()ManifestManager.append_artifacts()ManifestManager.append_artifacts_v2()ManifestManager.append_prediction()ManifestManager.artifact_exists()ManifestManager.create_pipeline()ManifestManager.create_run_manifest()ManifestManager.delete_pipeline()ManifestManager.extract_all_generator_choices()ManifestManager.extract_generator_choice()ManifestManager.extract_top_preprocessings()ManifestManager.get_artifact_path()ManifestManager.get_artifacts_list()ManifestManager.get_latest_execution_trace()ManifestManager.get_next_pipeline_number()ManifestManager.get_pipeline_path()ManifestManager.get_schema_version()ManifestManager.list_all_pipelines()ManifestManager.list_execution_traces()ManifestManager.list_pipelines()ManifestManager.list_runs()ManifestManager.load_execution_trace()ManifestManager.load_manifest()ManifestManager.load_run_manifest()ManifestManager.pipeline_exists()ManifestManager.save_execution_trace()ManifestManager.save_manifest()ManifestManager.update_manifest()ManifestManager.update_run_status()ManifestManager.upgrade_manifest_to_v2()
Module contents
Storage and persistence layer for pipeline artifacts and results.
This module handles all I/O operations including: - Pipeline artifact storage (binary models, scalers, etc.) - Manifest and metadata management - Workspace exports and imports - Prediction resolution and saving - Pipeline template library
- class nirs4all.pipeline.storage.ManifestManager(results_dir: str | Path)[source]
Bases:
objectManage pipeline manifests with sequential numbering.
This class handles: - Creating new pipelines with sequential numbering (0001_hash, 0002_hash) - Saving/loading pipeline manifests - Content-addressed artifact storage
- add_run_checkpoint(run_id: str, result_id: str, metadata: Dict[str, Any] | None = None) None[source]
Add a checkpoint to record completed result.
- Parameters:
run_id – Run identifier
result_id – Result/pipeline identifier that completed
metadata – Optional additional metadata for the checkpoint
- append_artifacts(pipeline_id: str, artifacts: List[dict]) None[source]
Append artifacts to a pipeline manifest.
Supports both v1 (list) and v2 (dict with items) artifact formats.
- Parameters:
pipeline_id – Pipeline ID
artifacts – List of artifact metadata dictionaries
- append_artifacts_v2(pipeline_id: str, records: List[Dict[str, Any]]) None[source]
Append v2 ArtifactRecords to a pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
records – List of ArtifactRecord instances or dicts
- append_prediction(pipeline_id: str, prediction: dict) None[source]
Append a prediction record to pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
prediction – Prediction metadata dictionary
- artifact_exists(content_hash: str) bool[source]
Check if artifact exists in storage.
- Parameters:
content_hash – Content hash to check
- Returns:
True if artifact exists
- create_pipeline(name: str, dataset: str, pipeline_config: dict, pipeline_hash: str, metadata: dict | None = None, generator_choices: List[Dict[str, Any]] | None = None, dataset_info: Dict[str, Any] | None = None) tuple[str, Path][source]
Create new pipeline with sequential numbering.
- Parameters:
name – Pipeline name (for human reference)
dataset – Dataset name
pipeline_config – Pipeline configuration dict
pipeline_hash – Hash of pipeline config (first 6 chars)
metadata – Optional initial metadata
generator_choices – List of generator choices that produced this pipeline. Each choice is a dict like {“_or_”: selected_value} or {“_range_”: 18}.
dataset_info – Optional dataset version info for run tracking (Phase 7). Expected format: {“path”: str, “hash”: str, “version_at_run”: int}
- Returns:
Tuple of (pipeline_id, pipeline_dir) pipeline_id format: “0001_abc123” or “0001_name_abc123”
- create_run_manifest(run_id: str, name: str, templates: List[Dict[str, Any]], datasets: List[Dict[str, Any]], config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None) Path[source]
Create a run-level manifest for tracking experiment sessions.
A run combines pipeline templates with datasets and generates results for every combination of expanded pipeline configurations and datasets.
- Parameters:
run_id – Unique identifier for the run (e.g., “2026-01-09_experiment_abc123”)
name – Human-readable name for the run
templates – List of template metadata dictionaries with: - id: Template ID - name: Template name - original_template: The unexpanded template definition - expansion_count: Number of configs generated - file_path: Optional path to template file
datasets – List of dataset metadata dictionaries with: - name: Dataset name - path: Dataset path - hash: Dataset content hash - file_size: File size in bytes - n_samples, n_features: Dimensions - y_stats: Target variable statistics
config – Run configuration (cv_folds, metric, etc.)
metadata – Additional user metadata
- Returns:
Path to the created run manifest file
Example
>>> manager = ManifestManager(results_dir) >>> run_path = manager.create_run_manifest( ... run_id="2026-01-09_protein_abc123", ... name="Protein Content Optimization", ... templates=[{ ... "id": "t1", ... "name": "PLS Variants", ... "original_template": [{"_or_": [...]}, ...], ... "expansion_count": 6 ... }], ... datasets=[{ ... "name": "Wheat", ... "path": "/data/wheat.csv", ... "hash": "abc123" ... }], ... config={"cv_folds": 5, "metric": "r2"} ... )
- delete_pipeline(pipeline_id: str) None[source]
Delete pipeline directory and manifest.
- Parameters:
pipeline_id – Pipeline ID to delete
- extract_all_generator_choices(prediction: Dict[str, Any], instantiate: bool = False, verbose: bool = False) List[Any][source]
Extract all generator choices from a prediction’s pipeline manifest.
Similar to extract_generator_choice but returns all choices at once.
- Parameters:
prediction – Prediction dictionary with ‘pipeline_uid’ field.
instantiate – If True, deserialize all choice values into Python objects. If False, return raw JSON values.
verbose – If True, print debug information.
- Returns:
List of choice values (JSON or instantiated objects). Empty list if no choices are available.
Example
>>> manager = ManifestManager(runs_dir) >>> top_pred = predictions.top(n=1)[0] >>> all_choices = manager.extract_all_generator_choices(top_pred) >>> # all_choices = ["StandardScaler", {'class': '...', 'params': {...}}]
- extract_generator_choice(prediction: Dict[str, Any], choice_index: int, instantiate: bool = False, verbose: bool = False) Any | None[source]
Extract a specific generator choice from a prediction’s pipeline manifest.
Given a prediction (from predictions.top() or similar), loads the corresponding pipeline manifest and returns the generator choice at the specified index.
Generator choices are stored in the manifest’s generator_choices field, which is a list of dicts like:
[{“_or_”: “StandardScaler”}, {“_range_”: 18}, {“_or_”: {…}}]
This method allows extracting the value of a specific choice, either as the raw JSON node (for re-use in pipeline specs) or as an instantiated Python object.
- Parameters:
prediction – Prediction dictionary with ‘pipeline_uid’ field.
choice_index – Index of the choice in the generator_choices list (0-based).
instantiate – If True, deserialize the choice value into a Python object. If False, return the raw JSON value.
verbose – If True, print debug information.
- Returns:
The prediction has no pipeline_uid
The manifest doesn’t exist or has no generator_choices
The choice_index is out of range
- Return type:
The choice value (JSON or instantiated object), or None if
Example
>>> manager = ManifestManager(runs_dir) >>> top_pred = predictions.top(n=1)[0] >>> # Get raw JSON of first choice >>> scaler_spec = manager.extract_generator_choice(top_pred, 0) >>> # scaler_spec = "sklearn.preprocessing._data.StandardScaler" >>> >>> # Get instantiated object >>> scaler = manager.extract_generator_choice(top_pred, 0, instantiate=True) >>> # scaler = StandardScaler() >>> >>> # Get second choice (e.g., model spec) >>> model_spec = manager.extract_generator_choice(top_pred, 1) >>> # model_spec = {'class': '...PLSRegression', 'params': {'n_components': 3}}
- extract_top_preprocessings(predictions: List[Dict[str, Any]], top_k: int = 3, step_name: str = 'feature_augmentation', exclude_scalers: bool = True, verbose: bool = False) List[List[Any]][source]
Extract top K unique preprocessing pipelines from ranked predictions.
Given a list of predictions (typically from predictions.top()), extracts the preprocessing pipeline that was actually used for each prediction by parsing the display string and deserializing the transformers.
Iterates through ALL predictions until top_k unique preprocessings are found. This ensures we get the best-performing unique preprocessings even if the top predictions share the same preprocessing (e.g., different folds).
This method is designed for pipeline chaining: run pipeline 1, get top predictions, extract their preprocessings, use in pipeline 2.
- Parameters:
predictions – List of prediction dictionaries, typically from predictions.top(n=…, rank_metric=”rmse”). Should be sorted by score (best first). Each prediction must have: - ‘preprocessings’: display string (e.g., “ExtendedMSC>Detr>MinMax”)
top_k – Number of unique preprocessings to extract. Will iterate through all predictions until this many unique preprocessings are found.
step_name – Unused, kept for backward compatibility.
exclude_scalers – If True, remove scaler transformers from each pipeline.
verbose – If True, print tracing information.
- Returns:
List of up to top_k unique preprocessing pipelines. Each pipeline is a list of transformer instances ready for use in pipeline config.
Example
>>> manager = ManifestManager(runs_dir) >>> top_preds = predictions.top(n=50, rank_metric="rmse") # Get many predictions >>> top_pp = manager.extract_top_preprocessings(top_preds, top_k=3) >>> # top_pp = [[ExtendedMSC(), Detrend()], [SNV()], [MSC(), FirstDer()]] >>> # Use in next pipeline: >>> pipeline = [{"feature_augmentation": {"_or_": top_pp}}, ...]
- get_artifact_path(content_hash: str) Path[source]
Get path for content-addressed artifact.
- Parameters:
content_hash – Content hash of artifact
- Returns:
2]>/<hash>
- Return type:
Path to artifact in artifacts/objects/<hash[
- get_artifacts_list(manifest: Dict[str, Any]) List[Dict[str, Any]][source]
Get artifacts as a flat list regardless of schema version.
- Parameters:
manifest – Manifest dictionary
- Returns:
List of artifact metadata dictionaries
- get_latest_execution_trace(pipeline_id: str) ExecutionTrace | None[source]
Get the most recent execution trace for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
Most recent ExecutionTrace or None if none exist
- get_next_pipeline_number(run_dir: Path | None = None) int[source]
Get next sequential pipeline number for workspace runs.
Counts existing pipeline directories (excludes _binaries).
- Parameters:
run_dir – Run directory to count pipelines in. If None, uses results_dir.
- Returns:
Next number (e.g., 1, 2, 3…)
- get_pipeline_path(pipeline_id: str) Path[source]
Get the directory path for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
Path to pipeline directory
- static get_schema_version(manifest: Dict[str, Any]) str[source]
Detect manifest schema version.
- Parameters:
manifest – Manifest dictionary
- Returns:
Schema version string (“1.0” or “2.0”)
- list_all_pipelines() List[Dict[str, Any]][source]
List all pipelines in this run.
- Returns:
List of pipeline info dictionaries
- list_execution_traces(pipeline_id: str) List[str][source]
List all execution trace IDs for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
List of trace IDs
- list_pipelines() List[str][source]
List all pipeline IDs in this run.
- Returns:
List of pipeline IDs (e.g., [“0001_abc123”, “0002_def456”])
- list_runs() List[Dict[str, Any]][source]
List all runs in this workspace.
- Returns:
List of run summary dictionaries
- load_execution_trace(pipeline_id: str, trace_id: str) ExecutionTrace | None[source]
Load a specific execution trace from the pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
trace_id – Trace ID to load
- Returns:
ExecutionTrace instance or None if not found
- load_manifest(pipeline_id: str) dict[source]
Load manifest YAML file.
- Parameters:
pipeline_id – Pipeline ID (e.g., “0001_abc123”)
- Returns:
Manifest dictionary
- Raises:
FileNotFoundError – If manifest doesn’t exist
- load_run_manifest(run_id: str) Dict[str, Any] | None[source]
Load a run manifest.
- Parameters:
run_id – Run identifier
- Returns:
Run manifest dictionary or None if not found
- pipeline_exists(pipeline_id: str) bool[source]
Check if a pipeline exists.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
True if manifest exists
- save_execution_trace(pipeline_id: str, trace: ExecutionTrace) None[source]
Save an execution trace to the pipeline manifest.
Execution traces record the exact path through the pipeline that produced a prediction, enabling deterministic replay for prediction, transfer, and export.
- Parameters:
pipeline_id – Pipeline ID
trace – ExecutionTrace instance to save
Note
The trace is stored in the manifest under “execution_traces” keyed by trace_id.
- save_manifest(pipeline_id: str, manifest: dict) None[source]
Save manifest YAML file.
- Parameters:
pipeline_id – Pipeline ID (e.g., “0001_abc123”)
manifest – Complete manifest dictionary
- update_manifest(pipeline_id: str, updates: dict) None[source]
Update specific fields in a manifest.
- Parameters:
pipeline_id – Pipeline ID
updates – Dictionary of fields to update
- update_run_status(run_id: str, status: str, summary_updates: Dict[str, Any] | None = None) None[source]
Update run status and optionally summary.
- Parameters:
run_id – Run identifier
status – New status (queued, running, completed, failed, paused, cancelled)
summary_updates – Optional summary field updates
- class nirs4all.pipeline.storage.PipelineLibrary(workspace_path: Path)[source]
Bases:
objectManages reusable pipeline templates in the workspace library.
Templates are stored in: workspace/library/{category}/{template_name}/ Each template contains: - pipeline.json: The pipeline configuration - metadata.json: Description, tags, performance metrics, etc. - README.md: Human-readable documentation
- copy_from_pipeline(pipeline_dir: Path, name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, extract_metrics: bool = True) Path[source]
Copy a successful pipeline to the library as a template.
- Parameters:
pipeline_dir – Path to pipeline directory (e.g., workspace/runs/…/0001_hash/)
name – Template name
category – Category for the template
description – Description of the template
tags – Tags for filtering
extract_metrics – Whether to extract metrics from manifest
- Returns:
Path to saved template
- delete_template(name: str, category: str | None = None) None[source]
Delete a template from the library.
- Parameters:
name – Template name
category – Optional category to search in
- Raises:
FileNotFoundError – If template not found
- export_template(name: str, export_path: str | Path, category: str | None = None) Path[source]
Export a template to a standalone directory.
- Parameters:
name – Template name
export_path – Destination directory
category – Optional category to search in
- Returns:
Path to exported template
- get_template_metadata(name: str, category: str | None = None) Dict[str, Any][source]
Get metadata for a template.
- Parameters:
name – Template name
category – Optional category to search in
- Returns:
Metadata dictionary
- import_template(import_path: str | Path, category: str = 'general', overwrite: bool = False) Path[source]
Import a template from an external directory.
- Parameters:
import_path – Path to template directory
category – Category to import into
overwrite – Whether to overwrite existing template
- Returns:
Path to imported template in library
- list_templates(category: str | None = None, tags: List[str] | None = None) List[Dict[str, Any]][source]
List all templates, optionally filtered by category and tags.
- Parameters:
category – Optional category filter
tags – Optional list of tags to filter by (matches any)
- Returns:
List of template metadata dictionaries
- load_template(name: str, category: str | None = None) Dict[str, Any][source]
Load a pipeline template by name.
- Parameters:
name – Template name
category – Optional category to search in (searches all if None)
- Returns:
Pipeline configuration dictionary
- Raises:
FileNotFoundError – If template not found
- save_template(pipeline_config: Dict[str, Any], name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, metrics: Dict[str, float] | None = None, notes: str = '', overwrite: bool = False) Path[source]
Save a pipeline configuration as a reusable template.
- Parameters:
pipeline_config – Pipeline configuration dictionary
name – Template name (will be sanitized for filesystem)
category – Category/folder (e.g., “preprocessing”, “modeling”, “full_pipeline”)
description – Short description of the template
tags – List of tags for filtering (e.g., [“classification”, “neural_network”])
metrics – Performance metrics (e.g., {“accuracy”: 0.95, “f1”: 0.93})
notes – Additional notes or usage instructions
overwrite – Whether to overwrite existing template
- Returns:
Path to saved template directory
- Raises:
FileExistsError – If template exists and overwrite=False
ValueError – If name contains invalid characters
- class nirs4all.pipeline.storage.PipelineWriter(pipeline_dir: Path, save_charts: bool = True)[source]
Bases:
objectWrites files within a pipeline directory.
Focused responsibility: File I/O operations for a single pipeline.
- list_files() list[str][source]
List all files in the pipeline directory.
- Returns:
List of filenames
- save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]
Save a text file to the pipeline directory.
- Parameters:
filename – Name of file to create
content – Text content to write
overwrite – Whether to overwrite existing files
encoding – Text encoding (default: utf-8)
warn_on_overwrite – Whether to warn when overwriting
- Returns:
Path to saved file
- Raises:
FileExistsError – If file exists and overwrite=False
- save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]
Save data as JSON file.
- Parameters:
filename – Name of file (will add .json if missing)
data – Data to serialize as JSON
overwrite – Whether to overwrite existing files
indent – JSON indentation (None for compact)
- Returns:
Path to saved file
- save_output(name: str, data: bytes | str, extension: str = '.png') Path | None[source]
Save a human-readable output file (chart, report, etc.).
- Parameters:
name – Output name (e.g., “2D_Chart”)
data – Binary or text data to save
extension – File extension (e.g., “.png”, “.csv”, “.txt”)
- Returns:
Path to saved file, or None if save_charts=False
- nirs4all.pipeline.storage.PredictionResolver
alias of
TargetResolver
- class nirs4all.pipeline.storage.SimulationSaver(base_path: str | Path | None = None, save_artifacts: bool = True, save_charts: bool = True)[source]
Bases:
objectManages saving simulation results with flat pipeline structure.
Acts as a facade coordinating: - PipelineWriter: File I/O within pipeline directories - WorkspaceExporter: Exporting best results - PredictionResolver: Resolving prediction targets
Works with ManifestManager to create: base_path/NNNN_hash/files
- cleanup(confirm: bool = False) None[source]
Remove the current simulation directory and all its contents.
- Parameters:
confirm – Must be True to actually delete files
- Raises:
RuntimeError – If not registered or confirm is False
- export_best_for_dataset(dataset_name: str, workspace_path: Path, runs_dir: Path, mode: str = 'predictions') Path | None[source]
Export best results for a dataset to exports/ folder.
Delegates to WorkspaceExporter.
Creates exports/{dataset_name}/ with best predictions, pipeline config, and charts. Files are renamed to include run date for tracking.
- Parameters:
dataset_name – Dataset name (matches global prediction JSON filename)
workspace_path – Workspace root path (unused, maintained for compatibility)
runs_dir – Runs directory path
mode – Export mode - “predictions”, “template”, “trained”, or “full”
- Returns:
Path to export directory, or None if no predictions found
- export_best_prediction(predictions_file: Path, exports_dir: Path, dataset_name: str, run_date: str, pipeline_id: str, custom_name: str = None) Path[source]
Export predictions CSV to best_predictions/ folder with optional custom name.
Delegates to WorkspaceExporter.
- Parameters:
predictions_file – Path to predictions.csv
exports_dir – Workspace exports directory (unused, maintained for compatibility)
dataset_name – Metadata for naming
run_date – Metadata for naming
pipeline_id – Metadata for naming
custom_name – Optional custom name for export
Returns: Path to exported CSV
- export_pipeline_full(pipeline_dir: Path, exports_dir: Path, dataset_name: str, run_date: str, custom_name: str = None) Path[source]
Export full pipeline results to flat structure with optional custom name.
Delegates to WorkspaceExporter.
- Parameters:
pipeline_dir – Path to pipeline (NNNN_hash/ or NNNN_pipelinename_hash/)
exports_dir – Workspace exports directory (unused, maintained for compatibility)
dataset_name – Dataset name
run_date – Run date (YYYYMMDD)
custom_name – Optional custom name for export
Returns: Path to exported directory
- property exporter: WorkspaceExporter
Get or create WorkspaceExporter instance.
- get_predict_targets(prediction_obj: Dict[str, Any] | str)[source]
Get target variable names for prediction from a prediction object.
Delegates to PredictionResolver.
- list_files() Dict[str, List[str]][source]
List all saved files in the current pipeline.
- Returns:
Dictionary with file lists
- persist_artifact(step_number: int, name: str, obj: Any, format_hint: str | None = None, branch_id: int | None = None, branch_name: str | None = None) Dict[str, Any][source]
Persist artifact using the serializer with content-addressed storage.
NOTE: This is for internal binary artifacts (models, transformers, etc.) For human-readable outputs (charts, reports), use save_output() instead.
- Parameters:
step_number – Pipeline step number
name – Artifact name (for reference)
obj – Object to persist
format_hint – Optional format hint for serializer
branch_id – Optional branch ID for pipeline branching
branch_name – Optional human-readable branch name
- Returns:
Artifact metadata dictionary (empty if save_artifacts=False)
- register(pipeline_id: str) Path[source]
Register a pipeline ID and set current directory.
- Parameters:
pipeline_id – Pipeline ID from ManifestManager (e.g., “0001_abc123”)
- Returns:
Path to the pipeline directory
- register_workspace(workspace_root: Path, dataset_name: str, pipeline_hash: str, run_name: str = None, pipeline_name: str = None) Path[source]
Register pipeline in workspace structure with optional custom names.
Creates: - Without custom names: workspace_root/runs/{dataset}/NNNN_{hash}/ - With run_name: workspace_root/runs/{dataset}_{runname}/NNNN_{hash}/ - With pipeline_name: workspace_root/runs/{dataset}/NNNN_{pipelinename}_{hash}/ - With both: workspace_root/runs/{dataset}_{runname}/NNNN_{pipelinename}_{hash}/
All pipelines for a dataset are stored in the same folder regardless of date.
- Returns:
Full path to pipeline directory
- property resolver: TargetResolver
Get or create PredictionResolver instance.
- save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]
Save a text file to the pipeline directory.
Delegates to PipelineWriter.
- save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]
Save data as JSON file.
Delegates to PipelineWriter.
- save_output(step_number: int, name: str, data: bytes | str, extension: str = '.png') Path | None[source]
Save a human-readable output file (chart, report, etc.).
Delegates to PipelineWriter.
- Parameters:
step_number – Pipeline step number (unused, kept for compatibility)
name – Output name (e.g., “2D_Chart”)
data – Binary or text data to save
extension – File extension (e.g., “.png”, “.csv”, “.txt”)
- Returns:
Path to saved file, or None if save_charts=False
- property writer: PipelineWriter
Get or create PipelineWriter instance.
- class nirs4all.pipeline.storage.TargetResolver(workspace_path: Path)[source]
Bases:
objectResolves prediction targets for predict mode.
Focused responsibility: Finding and resolving prediction targets by ID.
Note: For the comprehensive Phase 3 resolver that handles multiple source types (prediction dict, folder, Run, artifact_id, bundle), see nirs4all.pipeline.resolver.PredictionResolver.
- find_best_for_config(config_path: str) Dict[str, Any] | None[source]
Find the best prediction for a given config path.
- Parameters:
config_path – Path to pipeline configuration
- Returns:
Best prediction metadata, or None if not found
- find_prediction_by_id(prediction_id: str) Dict[str, Any] | None[source]
Search for a prediction by ID in global predictions databases.
Uses direct ID filtering for O(1) lookup per file instead of O(N) iteration.
- Parameters:
prediction_id – Unique prediction identifier
- Returns:
Prediction metadata dict, or None if not found
- resolve_target(prediction_obj: Dict[str, Any] | str) tuple[str, Dict[str, Any] | None][source]
Resolve prediction object to config path and model metadata.
- Parameters:
prediction_obj – Either: - Dict with ‘config_path’ and optional model metadata - String: config path or prediction ID
- Returns:
Tuple of (config_path, target_model_metadata)
- Raises:
ValueError – If prediction ID not found or invalid input
- class nirs4all.pipeline.storage.WorkspaceExporter(workspace_path: Path)[source]
Bases:
objectExports best results to workspace exports/ folder.
Focused responsibility: Export functionality for best pipelines.
- export_best_for_dataset(dataset_name: str, runs_dir: Path, mode: str = 'predictions') Path | None[source]
Export best results for a dataset to exports/ folder.
- Parameters:
dataset_name – Dataset name
runs_dir – Runs directory path
mode – Export mode - “predictions”, “template”, “trained”, or “full”
- Returns:
Path to export directory, or None if no predictions found
- export_best_prediction(predictions_file: Path, dataset_name: str, run_date: str = None, pipeline_id: str = None, custom_name: str | None = None) Path[source]
Export predictions CSV to best_predictions/ folder.
- Parameters:
predictions_file – Path to predictions.csv
dataset_name – Dataset name
run_date – Run date - deprecated, kept for compatibility
pipeline_id – Pipeline identifier
custom_name – Optional custom name for export
- Returns:
Path to exported CSV
- export_pipeline_full(pipeline_dir: Path, dataset_name: str, run_date: str = None, custom_name: str | None = None) Path[source]
Export full pipeline results to flat structure.
- Parameters:
pipeline_dir – Path to pipeline (NNNN_hash/)
dataset_name – Dataset name
run_date – Run date (YYYYMMDD) - deprecated, kept for compatibility
custom_name – Optional custom name for export
- Returns:
Path to exported directory