nirs4all.pipeline.storage.manifest_manager module
Manifest Manager - Pipeline manifest and dataset index management
Manages pipeline manifests with sequential numbering and content-addressed artifacts. Provides centralized pipeline registration, lookup, and lifecycle management.
- Architecture (v2):
workspace/ ├── binaries/<dataset>/ # Centralized artifact storage │ ├── model_PLSRegression_abc123.joblib │ └── transformer_StandardScaler_def456.pkl └── runs/<dataset>/
├── 0001_pls_abc123/ │ └── manifest.yaml # References artifacts by path ├── 0002_rf_def456/ │ └── manifest.yaml └── predictions.json # Global predictions
- Manifest Schema Versions:
v1.0: Legacy format with flat artifacts list v2.0: New format with structured artifacts section and schema_version field
- class nirs4all.pipeline.storage.manifest_manager.ManifestManager(results_dir: str | Path)[source]
Bases:
objectManage pipeline manifests with sequential numbering.
This class handles: - Creating new pipelines with sequential numbering (0001_hash, 0002_hash) - Saving/loading pipeline manifests - Content-addressed artifact storage
- append_artifacts(pipeline_id: str, artifacts: List[dict]) None[source]
Append artifacts to a pipeline manifest.
Supports both v1 (list) and v2 (dict with items) artifact formats.
- Parameters:
pipeline_id – Pipeline ID
artifacts – List of artifact metadata dictionaries
- append_artifacts_v2(pipeline_id: str, records: List[Dict[str, Any]]) None[source]
Append v2 ArtifactRecords to a pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
records – List of ArtifactRecord instances or dicts
- append_prediction(pipeline_id: str, prediction: dict) None[source]
Append a prediction record to pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
prediction – Prediction metadata dictionary
- artifact_exists(content_hash: str) bool[source]
Check if artifact exists in storage.
- Parameters:
content_hash – Content hash to check
- Returns:
True if artifact exists
- create_pipeline(name: str, dataset: str, pipeline_config: dict, pipeline_hash: str, metadata: dict | None = None, generator_choices: List[Dict[str, Any]] | None = None) tuple[str, Path][source]
Create new pipeline with sequential numbering.
- Parameters:
name – Pipeline name (for human reference)
dataset – Dataset name
pipeline_config – Pipeline configuration dict
pipeline_hash – Hash of pipeline config (first 6 chars)
metadata – Optional initial metadata
generator_choices – List of generator choices that produced this pipeline. Each choice is a dict like {“_or_”: selected_value} or {“_range_”: 18}.
- Returns:
Tuple of (pipeline_id, pipeline_dir) pipeline_id format: “0001_abc123” or “0001_name_abc123”
- delete_pipeline(pipeline_id: str) None[source]
Delete pipeline directory and manifest.
- Parameters:
pipeline_id – Pipeline ID to delete
- extract_all_generator_choices(prediction: Dict[str, Any], instantiate: bool = False, verbose: bool = False) List[Any][source]
Extract all generator choices from a prediction’s pipeline manifest.
Similar to extract_generator_choice but returns all choices at once.
- Parameters:
prediction – Prediction dictionary with ‘pipeline_uid’ field.
instantiate – If True, deserialize all choice values into Python objects. If False, return raw JSON values.
verbose – If True, print debug information.
- Returns:
List of choice values (JSON or instantiated objects). Empty list if no choices are available.
Example
>>> manager = ManifestManager(runs_dir) >>> top_pred = predictions.top(n=1)[0] >>> all_choices = manager.extract_all_generator_choices(top_pred) >>> # all_choices = ["StandardScaler", {'class': '...', 'params': {...}}]
- extract_generator_choice(prediction: Dict[str, Any], choice_index: int, instantiate: bool = False, verbose: bool = False) Any | None[source]
Extract a specific generator choice from a prediction’s pipeline manifest.
Given a prediction (from predictions.top() or similar), loads the corresponding pipeline manifest and returns the generator choice at the specified index.
Generator choices are stored in the manifest’s generator_choices field, which is a list of dicts like:
[{“_or_”: “StandardScaler”}, {“_range_”: 18}, {“_or_”: {…}}]
This method allows extracting the value of a specific choice, either as the raw JSON node (for re-use in pipeline specs) or as an instantiated Python object.
- Parameters:
prediction – Prediction dictionary with ‘pipeline_uid’ field.
choice_index – Index of the choice in the generator_choices list (0-based).
instantiate – If True, deserialize the choice value into a Python object. If False, return the raw JSON value.
verbose – If True, print debug information.
- Returns:
The prediction has no pipeline_uid
The manifest doesn’t exist or has no generator_choices
The choice_index is out of range
- Return type:
The choice value (JSON or instantiated object), or None if
Example
>>> manager = ManifestManager(runs_dir) >>> top_pred = predictions.top(n=1)[0] >>> # Get raw JSON of first choice >>> scaler_spec = manager.extract_generator_choice(top_pred, 0) >>> # scaler_spec = "sklearn.preprocessing._data.StandardScaler" >>> >>> # Get instantiated object >>> scaler = manager.extract_generator_choice(top_pred, 0, instantiate=True) >>> # scaler = StandardScaler() >>> >>> # Get second choice (e.g., model spec) >>> model_spec = manager.extract_generator_choice(top_pred, 1) >>> # model_spec = {'class': '...PLSRegression', 'params': {'n_components': 3}}
- extract_top_preprocessings(predictions: List[Dict[str, Any]], top_k: int = 3, step_name: str = 'feature_augmentation', exclude_scalers: bool = True, verbose: bool = False) List[List[Any]][source]
Extract top K unique preprocessing pipelines from ranked predictions.
Given a list of predictions (typically from predictions.top()), extracts the preprocessing pipeline that was actually used for each prediction by parsing the display string and deserializing the transformers.
Iterates through ALL predictions until top_k unique preprocessings are found. This ensures we get the best-performing unique preprocessings even if the top predictions share the same preprocessing (e.g., different folds).
This method is designed for pipeline chaining: run pipeline 1, get top predictions, extract their preprocessings, use in pipeline 2.
- Parameters:
predictions – List of prediction dictionaries, typically from predictions.top(n=…, rank_metric=”rmse”). Should be sorted by score (best first). Each prediction must have: - ‘preprocessings’: display string (e.g., “ExtendedMSC>Detr>MinMax”)
top_k – Number of unique preprocessings to extract. Will iterate through all predictions until this many unique preprocessings are found.
step_name – Unused, kept for backward compatibility.
exclude_scalers – If True, remove scaler transformers from each pipeline.
verbose – If True, print tracing information.
- Returns:
List of up to top_k unique preprocessing pipelines. Each pipeline is a list of transformer instances ready for use in pipeline config.
Example
>>> manager = ManifestManager(runs_dir) >>> top_preds = predictions.top(n=50, rank_metric="rmse") # Get many predictions >>> top_pp = manager.extract_top_preprocessings(top_preds, top_k=3) >>> # top_pp = [[ExtendedMSC(), Detrend()], [SNV()], [MSC(), FirstDer()]] >>> # Use in next pipeline: >>> pipeline = [{"feature_augmentation": {"_or_": top_pp}}, ...]
- get_artifact_path(content_hash: str) Path[source]
Get path for content-addressed artifact.
- Parameters:
content_hash – Content hash of artifact
- Returns:
2]>/<hash>
- Return type:
Path to artifact in artifacts/objects/<hash[
- get_artifacts_list(manifest: Dict[str, Any]) List[Dict[str, Any]][source]
Get artifacts as a flat list regardless of schema version.
- Parameters:
manifest – Manifest dictionary
- Returns:
List of artifact metadata dictionaries
- get_latest_execution_trace(pipeline_id: str) ExecutionTrace | None[source]
Get the most recent execution trace for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
Most recent ExecutionTrace or None if none exist
- get_next_pipeline_number(run_dir: Path | None = None) int[source]
Get next sequential pipeline number for workspace runs.
Counts existing pipeline directories (excludes _binaries).
- Parameters:
run_dir – Run directory to count pipelines in. If None, uses results_dir.
- Returns:
Next number (e.g., 1, 2, 3…)
- get_pipeline_path(pipeline_id: str) Path[source]
Get the directory path for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
Path to pipeline directory
- static get_schema_version(manifest: Dict[str, Any]) str[source]
Detect manifest schema version.
- Parameters:
manifest – Manifest dictionary
- Returns:
Schema version string (“1.0” or “2.0”)
- list_all_pipelines() List[Dict[str, Any]][source]
List all pipelines in this run.
- Returns:
List of pipeline info dictionaries
- list_execution_traces(pipeline_id: str) List[str][source]
List all execution trace IDs for a pipeline.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
List of trace IDs
- list_pipelines() List[str][source]
List all pipeline IDs in this run.
- Returns:
List of pipeline IDs (e.g., [“0001_abc123”, “0002_def456”])
- load_execution_trace(pipeline_id: str, trace_id: str) ExecutionTrace | None[source]
Load a specific execution trace from the pipeline manifest.
- Parameters:
pipeline_id – Pipeline ID
trace_id – Trace ID to load
- Returns:
ExecutionTrace instance or None if not found
- load_manifest(pipeline_id: str) dict[source]
Load manifest YAML file.
- Parameters:
pipeline_id – Pipeline ID (e.g., “0001_abc123”)
- Returns:
Manifest dictionary
- Raises:
FileNotFoundError – If manifest doesn’t exist
- pipeline_exists(pipeline_id: str) bool[source]
Check if a pipeline exists.
- Parameters:
pipeline_id – Pipeline ID
- Returns:
True if manifest exists
- save_execution_trace(pipeline_id: str, trace: ExecutionTrace) None[source]
Save an execution trace to the pipeline manifest.
Execution traces record the exact path through the pipeline that produced a prediction, enabling deterministic replay for prediction, transfer, and export.
- Parameters:
pipeline_id – Pipeline ID
trace – ExecutionTrace instance to save
Note
The trace is stored in the manifest under “execution_traces” keyed by trace_id.
- save_manifest(pipeline_id: str, manifest: dict) None[source]
Save manifest YAML file.
- Parameters:
pipeline_id – Pipeline ID (e.g., “0001_abc123”)
manifest – Complete manifest dictionary