nirs4all.pipeline package
Subpackages
- nirs4all.pipeline.bundle package
- Submodules
- Module contents
BundleFormatBundleGeneratorBundleLoaderBundleLoader.bundle_pathBundleLoader.metadataBundleLoader.traceBundleLoader.pipeline_configBundleLoader.fold_weightsBundleLoader.artifact_providerBundleLoader.get_chain_for_artifact()BundleLoader.get_merged_chains()BundleLoader.get_partitioner_routing()BundleLoader.get_required_metadata_columns()BundleLoader.get_step_info()BundleLoader.has_partitioner_routing()BundleLoader.import_artifacts_to_registry()BundleLoader.predict()BundleLoader.predict_with_metadata()BundleLoader.to_resolved_prediction()
BundleMetadataBundleMetadata.bundle_format_versionBundleMetadata.nirs4all_versionBundleMetadata.created_atBundleMetadata.pipeline_uidBundleMetadata.source_typeBundleMetadata.model_step_indexBundleMetadata.fold_strategyBundleMetadata.preprocessing_chainBundleMetadata.trace_idBundleMetadata.original_manifestBundleMetadata.partitioner_routingBundleMetadata.bundle_format_versionBundleMetadata.created_atBundleMetadata.fold_strategyBundleMetadata.from_dict()BundleMetadata.model_step_indexBundleMetadata.nirs4all_versionBundleMetadata.original_manifestBundleMetadata.partitioner_routingBundleMetadata.pipeline_uidBundleMetadata.preprocessing_chainBundleMetadata.source_typeBundleMetadata.trace_id
- nirs4all.pipeline.config package
- Submodules
- nirs4all.pipeline.config.component_serialization module
- nirs4all.pipeline.config.context module
- nirs4all.pipeline.config.generator module
CartesianStrategyChainStrategyExpansionStrategyExpansionTreeNodeGridStrategyLogRangeStrategyOrStrategyRangeStrategySampleStrategyValidationErrorValidationResultValidationSeverityZipStrategyapply_all_constraints()apply_exclude_constraint()apply_mutex_constraint()apply_requires_constraint()batch_iter()clear_presets()count_combinations()diff_configs()expand_spec()expand_spec_iter()expand_spec_with_choices()export_presets()extract_base_node()extract_constraints()extract_metadata()extract_modifiers()extract_or_choices()extract_range_spec()extract_tags()format_config_table()get_expansion_tree()get_preset()get_preset_info()get_strategy()has_cartesian_keyword()has_chain_keyword()has_grid_keyword()has_log_range_keyword()has_or_keyword()has_preset()has_range_keyword()has_sample_keyword()has_zip_keyword()import_presets()is_generator_node()is_preset_reference()is_pure_cartesian_node()is_pure_chain_node()is_pure_grid_node()is_pure_log_range_node()is_pure_or_node()is_pure_range_node()is_pure_sample_node()is_pure_zip_node()iter_with_progress()list_presets()parse_constraints()print_expansion_tree()register_builtin_presets()register_preset()register_strategy()resolve_preset()resolve_presets_recursive()sample_with_seed()summarize_configs()to_dataframe()unregister_preset()validate_config()validate_constraints()validate_expanded_configs()validate_spec()
- nirs4all.pipeline.config.pipeline_config module
- Module contents
ArtifactProviderDataSelectorDataSelector.partitionDataSelector.processingDataSelector.layoutDataSelector.concat_sourceDataSelector.fold_idDataSelector.include_augmentedDataSelector.yDataSelector.branch_idDataSelector.branch_pathDataSelector.branch_nameDataSelector.__delitem__()DataSelector.__getitem__()DataSelector.__iter__()DataSelector.__len__()DataSelector.__setitem__()DataSelector.branch_idDataSelector.branch_nameDataSelector.branch_pathDataSelector.concat_sourceDataSelector.copy()DataSelector.fold_idDataSelector.include_augmentedDataSelector.layoutDataSelector.partitionDataSelector.processingDataSelector.with_augmented()DataSelector.with_branch()DataSelector.with_fold()DataSelector.with_layout()DataSelector.with_partition()DataSelector.with_processing()DataSelector.y
ExecutionContextExecutionContext.selectorExecutionContext.stateExecutionContext.metadataExecutionContext.customExecutionContext.aggregate_columnExecutionContext.copy()ExecutionContext.get_selector()ExecutionContext.with_branch()ExecutionContext.with_layout()ExecutionContext.with_metadata()ExecutionContext.with_partition()ExecutionContext.with_processing()ExecutionContext.with_step_number()ExecutionContext.with_y()
LoaderArtifactProviderLoaderArtifactProvider.loaderLoaderArtifactProvider.traceLoaderArtifactProvider.get_artifact()LoaderArtifactProvider.get_artifact_by_chain()LoaderArtifactProvider.get_artifacts_for_chain_prefix()LoaderArtifactProvider.get_artifacts_for_step()LoaderArtifactProvider.get_fold_artifacts()LoaderArtifactProvider.has_artifacts_for_step()
MapArtifactProviderPipelineConfigsPipelineStateRuntimeContextRuntimeContext.saverRuntimeContext.manifest_managerRuntimeContext.artifact_loaderRuntimeContext.artifact_providerRuntimeContext.artifact_registryRuntimeContext.pipeline_uidRuntimeContext.step_runnerRuntimeContext.operation_countRuntimeContext.substep_numberRuntimeContext.trace_recorderRuntimeContext.retrain_configRuntimeContext.artifact_load_counterRuntimeContext.artifact_loaderRuntimeContext.artifact_providerRuntimeContext.artifact_registryRuntimeContext.explainerRuntimeContext.get_execution_trace()RuntimeContext.get_trace_id()RuntimeContext.manifest_managerRuntimeContext.next_artifact_load_index()RuntimeContext.next_op()RuntimeContext.next_processing_index()RuntimeContext.operation_countRuntimeContext.pipeline_uidRuntimeContext.processing_counterRuntimeContext.record_input_shapes()RuntimeContext.record_output_shapes()RuntimeContext.record_step_artifact()RuntimeContext.record_step_end()RuntimeContext.record_step_start()RuntimeContext.reset_processing_counter()RuntimeContext.retrain_configRuntimeContext.saverRuntimeContext.should_train_step()RuntimeContext.step_numberRuntimeContext.step_runnerRuntimeContext.substep_numberRuntimeContext.target_modelRuntimeContext.trace_recorder
StepMetadataStepMetadata.keywordStepMetadata.step_idStepMetadata.augment_sampleStepMetadata.add_featureStepMetadata.replace_processingStepMetadata.target_samplesStepMetadata.target_featuresStepMetadata.add_featureStepMetadata.augment_sampleStepMetadata.copy()StepMetadata.keywordStepMetadata.replace_processingStepMetadata.reset_ephemeral_flags()StepMetadata.step_idStepMetadata.target_featuresStepMetadata.target_samples
count_combinations()expand_spec()serialize_component()
- Submodules
- nirs4all.pipeline.execution package
- Submodules
- Module contents
ArtifactMetaArtifactMeta.nameArtifactMeta.content_hashArtifactMeta.pathArtifactMeta.formatArtifactMeta.format_versionArtifactMeta.nirs4all_versionArtifactMeta.metadataArtifactMeta.content_hashArtifactMeta.formatArtifactMeta.format_versionArtifactMeta.metadataArtifactMeta.nameArtifactMeta.nirs4all_versionArtifactMeta.path
ExecutorBuilderExecutorBuilder.artifact_registryExecutorBuilder.build()ExecutorBuilder.with_artifact_loader()ExecutorBuilder.with_artifact_registry()ExecutorBuilder.with_continue_on_error()ExecutorBuilder.with_dataset()ExecutorBuilder.with_manifest_manager()ExecutorBuilder.with_mode()ExecutorBuilder.with_plots_visible()ExecutorBuilder.with_run_directory()ExecutorBuilder.with_save_artifacts()ExecutorBuilder.with_save_charts()ExecutorBuilder.with_saver()ExecutorBuilder.with_show_spinner()ExecutorBuilder.with_step_runner()ExecutorBuilder.with_verbose()ExecutorBuilder.with_workspace()ExecutorBuilder.workspace
PipelineExecutorPipelineExecutor.step_runnerPipelineExecutor.manifest_managerPipelineExecutor.verbosePipelineExecutor.modePipelineExecutor.continue_on_errorPipelineExecutor.artifact_registryPipelineExecutor.execute()PipelineExecutor.execute_minimal()PipelineExecutor.initialize_context()PipelineExecutor.next_op()
PipelineOrchestratorPipelineOrchestrator.workspace_pathPipelineOrchestrator.runs_dirPipelineOrchestrator.verbosePipelineOrchestrator.modePipelineOrchestrator.save_artifactsPipelineOrchestrator.save_chartsPipelineOrchestrator.enable_tab_reportsPipelineOrchestrator.keep_datasetsPipelineOrchestrator.plots_visiblePipelineOrchestrator.execute()
StepResult
- nirs4all.pipeline.steps package
- nirs4all.pipeline.storage package
- Subpackages
- Submodules
- Module contents
ManifestManagerManifestManager.append_artifacts()ManifestManager.append_artifacts_v2()ManifestManager.append_prediction()ManifestManager.artifact_exists()ManifestManager.create_pipeline()ManifestManager.delete_pipeline()ManifestManager.extract_all_generator_choices()ManifestManager.extract_generator_choice()ManifestManager.extract_top_preprocessings()ManifestManager.get_artifact_path()ManifestManager.get_artifacts_list()ManifestManager.get_latest_execution_trace()ManifestManager.get_next_pipeline_number()ManifestManager.get_pipeline_path()ManifestManager.get_schema_version()ManifestManager.list_all_pipelines()ManifestManager.list_execution_traces()ManifestManager.list_pipelines()ManifestManager.load_execution_trace()ManifestManager.load_manifest()ManifestManager.pipeline_exists()ManifestManager.save_execution_trace()ManifestManager.save_manifest()ManifestManager.update_manifest()ManifestManager.upgrade_manifest_to_v2()
PipelineLibraryPipelineWriterPredictionResolverSimulationSaverSimulationSaver.cleanup()SimulationSaver.export_best_for_dataset()SimulationSaver.export_best_prediction()SimulationSaver.export_pipeline_full()SimulationSaver.exporterSimulationSaver.get_metadata()SimulationSaver.get_path()SimulationSaver.get_predict_targets()SimulationSaver.list_files()SimulationSaver.persist_artifact()SimulationSaver.register()SimulationSaver.register_workspace()SimulationSaver.resolverSimulationSaver.save_file()SimulationSaver.save_json()SimulationSaver.save_output()SimulationSaver.writer
TargetResolverWorkspaceExporter
- nirs4all.pipeline.trace package
- Submodules
- Module contents
ExecutionStepExecutionStep.step_indexExecutionStep.operator_typeExecutionStep.operator_classExecutionStep.operator_configExecutionStep.execution_modeExecutionStep.artifactsExecutionStep.branch_pathExecutionStep.branch_nameExecutionStep.duration_msExecutionStep.metadataExecutionStep.input_chain_pathExecutionStep.output_chain_pathsExecutionStep.source_countExecutionStep.produces_branchesExecutionStep.substep_indexExecutionStep.add_output_chain()ExecutionStep.artifactsExecutionStep.branch_nameExecutionStep.branch_pathExecutionStep.duration_msExecutionStep.execution_modeExecutionStep.from_dict()ExecutionStep.has_artifacts()ExecutionStep.input_chain_pathExecutionStep.input_features_shapeExecutionStep.input_shapeExecutionStep.metadataExecutionStep.operator_classExecutionStep.operator_configExecutionStep.operator_typeExecutionStep.output_chain_pathsExecutionStep.output_features_shapeExecutionStep.output_shapeExecutionStep.produces_branchesExecutionStep.source_countExecutionStep.step_indexExecutionStep.substep_indexExecutionStep.to_dict()
ExecutionTraceExecutionTrace.trace_idExecutionTrace.pipeline_uidExecutionTrace.created_atExecutionTrace.stepsExecutionTrace.model_step_indexExecutionTrace.fold_weightsExecutionTrace.preprocessing_chainExecutionTrace.metadataExecutionTrace.add_step()ExecutionTrace.created_atExecutionTrace.finalize()ExecutionTrace.fold_weightsExecutionTrace.from_dict()ExecutionTrace.get_artifact_ids()ExecutionTrace.get_artifacts_by_step()ExecutionTrace.get_fold_artifact_ids()ExecutionTrace.get_model_artifact_id()ExecutionTrace.get_step()ExecutionTrace.get_steps_before()ExecutionTrace.get_steps_up_to_model()ExecutionTrace.metadataExecutionTrace.model_step_indexExecutionTrace.pipeline_uidExecutionTrace.preprocessing_chainExecutionTrace.set_model_step()ExecutionTrace.stepsExecutionTrace.to_dict()ExecutionTrace.trace_id
MinimalPipelineMinimalPipeline.trace_idMinimalPipeline.pipeline_uidMinimalPipeline.stepsMinimalPipeline.artifact_mapMinimalPipeline.model_step_indexMinimalPipeline.fold_weightsMinimalPipeline.preprocessing_chainMinimalPipeline.metadataMinimalPipeline.artifact_mapMinimalPipeline.fold_weightsMinimalPipeline.get_all_chain_paths()MinimalPipeline.get_artifact_by_chain()MinimalPipeline.get_artifact_ids()MinimalPipeline.get_artifacts_for_step()MinimalPipeline.get_step()MinimalPipeline.get_step_count()MinimalPipeline.get_step_indices()MinimalPipeline.has_step()MinimalPipeline.metadataMinimalPipeline.model_step_indexMinimalPipeline.pipeline_uidMinimalPipeline.preprocessing_chainMinimalPipeline.stepsMinimalPipeline.trace_id
MinimalPipelineStepMinimalPipelineStep.step_indexMinimalPipelineStep.step_configMinimalPipelineStep.execution_modeMinimalPipelineStep.artifactsMinimalPipelineStep.operator_typeMinimalPipelineStep.operator_classMinimalPipelineStep.branch_pathMinimalPipelineStep.branch_nameMinimalPipelineStep.depends_onMinimalPipelineStep.artifactsMinimalPipelineStep.branch_nameMinimalPipelineStep.branch_pathMinimalPipelineStep.depends_onMinimalPipelineStep.execution_modeMinimalPipelineStep.get_artifact_by_chain()MinimalPipelineStep.get_artifact_ids()MinimalPipelineStep.get_artifacts_by_chain()MinimalPipelineStep.has_artifacts()MinimalPipelineStep.operator_classMinimalPipelineStep.operator_typeMinimalPipelineStep.step_configMinimalPipelineStep.step_indexMinimalPipelineStep.substep_index
StepArtifactsStepArtifacts.artifact_idsStepArtifacts.primary_artifact_idStepArtifacts.fold_artifact_idsStepArtifacts.primary_artifactsStepArtifacts.by_branchStepArtifacts.by_sourceStepArtifacts.by_chainStepArtifacts.metadataStepArtifacts.add_artifact()StepArtifacts.add_fold_artifact()StepArtifacts.artifact_idsStepArtifacts.by_branchStepArtifacts.by_chainStepArtifacts.by_sourceStepArtifacts.fold_artifact_idsStepArtifacts.from_dict()StepArtifacts.get_artifact_by_chain()StepArtifacts.get_artifacts_for_branch()StepArtifacts.get_artifacts_for_source()StepArtifacts.merge()StepArtifacts.metadataStepArtifacts.primary_artifact_idStepArtifacts.primary_artifactsStepArtifacts.to_dict()
StepExecutionModeTraceBasedExtractorTraceBasedExtractor.include_skippedTraceBasedExtractor.preserve_orderTraceBasedExtractor.extract()TraceBasedExtractor.extract_for_branch()TraceBasedExtractor.extract_for_branch_name()TraceBasedExtractor.extract_for_step()TraceBasedExtractor.get_required_artifact_ids()TraceBasedExtractor.get_step_dependency_graph()TraceBasedExtractor.validate_trace_for_prediction()
TraceRecorderTraceRecorder.traceTraceRecorder.current_stepTraceRecorder.step_start_timeTraceRecorder.pipeline_idTraceRecorder.add_step_metadata()TraceRecorder.build_chain_for_artifact()TraceRecorder.current_branch_path()TraceRecorder.current_chain()TraceRecorder.end_step()TraceRecorder.enter_branch()TraceRecorder.exit_branch()TraceRecorder.finalize()TraceRecorder.get_current_step_index()TraceRecorder.has_model_step()TraceRecorder.in_branch()TraceRecorder.mark_step_skipped()TraceRecorder.pop_chain()TraceRecorder.push_chain()TraceRecorder.record_artifact()TraceRecorder.record_input_shapes()TraceRecorder.record_output_shapes()TraceRecorder.reset_chain_to()TraceRecorder.start_branch_step()TraceRecorder.start_branch_substep()TraceRecorder.start_step()TraceRecorder.trace_id
Submodules
- nirs4all.pipeline.explainer module
- nirs4all.pipeline.minimal_predictor module
MinimalArtifactProviderMinimalArtifactProvider.minimal_pipelineMinimalArtifactProvider.artifact_loaderMinimalArtifactProvider.target_sub_indexMinimalArtifactProvider.target_model_nameMinimalArtifactProvider.get_artifact()MinimalArtifactProvider.get_artifacts_for_step()MinimalArtifactProvider.get_fold_artifacts()MinimalArtifactProvider.get_fold_weights()MinimalArtifactProvider.has_artifacts_for_step()
MinimalPredictor
- nirs4all.pipeline.predictor module
- nirs4all.pipeline.resolver module
FoldStrategyPredictionResolverResolvedPredictionResolvedPrediction.source_typeResolvedPrediction.minimal_pipelineResolvedPrediction.artifact_providerResolvedPrediction.traceResolvedPrediction.fold_strategyResolvedPrediction.fold_weightsResolvedPrediction.model_step_indexResolvedPrediction.target_modelResolvedPrediction.pipeline_uidResolvedPrediction.run_dirResolvedPrediction.manifestResolvedPrediction.artifact_providerResolvedPrediction.fold_strategyResolvedPrediction.fold_weightsResolvedPrediction.get_preprocessing_chain()ResolvedPrediction.has_fold_artifacts()ResolvedPrediction.has_trace()ResolvedPrediction.manifestResolvedPrediction.minimal_pipelineResolvedPrediction.model_step_indexResolvedPrediction.pipeline_uidResolvedPrediction.run_dirResolvedPrediction.source_typeResolvedPrediction.target_modelResolvedPrediction.trace
SourceTypeSourceType.PREDICTIONSourceType.FOLDERSourceType.RUNSourceType.ARTIFACT_IDSourceType.BUNDLESourceType.TRACE_IDSourceType.MODEL_FILESourceType.UNKNOWNSourceType.ARTIFACT_IDSourceType.BUNDLESourceType.FOLDERSourceType.MODEL_FILESourceType.PREDICTIONSourceType.RUNSourceType.TRACE_IDSourceType.UNKNOWN
- nirs4all.pipeline.retrainer module
ExtractedPipelineExtractedPipeline.stepsExtractedPipeline.traceExtractedPipeline.artifact_providerExtractedPipeline.model_step_indexExtractedPipeline.preprocessing_chainExtractedPipeline.source_pipeline_uidExtractedPipeline.metadataExtractedPipeline.artifact_providerExtractedPipeline.get_model_step()ExtractedPipeline.get_step()ExtractedPipeline.metadataExtractedPipeline.model_step_indexExtractedPipeline.preprocessing_chainExtractedPipeline.set_model()ExtractedPipeline.set_step()ExtractedPipeline.source_pipeline_uidExtractedPipeline.stepsExtractedPipeline.trace
RetrainArtifactProviderRetrainConfigRetrainConfig.modeRetrainConfig.step_modesRetrainConfig.new_modelRetrainConfig.epochsRetrainConfig.learning_rateRetrainConfig.freeze_layersRetrainConfig.metadataRetrainConfig.epochsRetrainConfig.freeze_layersRetrainConfig.get_step_mode()RetrainConfig.learning_rateRetrainConfig.metadataRetrainConfig.modeRetrainConfig.new_modelRetrainConfig.should_train_step()RetrainConfig.step_modes
RetrainModeRetrainerStepMode
- nirs4all.pipeline.runner module
PipelineRunnerPipelineRunner.workspace_pathPipelineRunner.verbosePipelineRunner.modePipelineRunner.save_artifactsPipelineRunner.save_chartsPipelineRunner.enable_tab_reportsPipelineRunner.continue_on_errorPipelineRunner.show_spinnerPipelineRunner.keep_datasetsPipelineRunner.plots_visiblePipelineRunner.orchestratorPipelineRunner.predictorPipelineRunner.explainerPipelineRunner.raw_dataPipelineRunner.pp_dataPipelineRunner.current_run_dirPipelineRunner.explain()PipelineRunner.export()PipelineRunner.export_best_for_dataset()PipelineRunner.export_model()PipelineRunner.extract()PipelineRunner.last_aggregatePipelineRunner.last_aggregate_exclude_outliersPipelineRunner.last_aggregate_methodPipelineRunner.libraryPipelineRunner.next_op()PipelineRunner.predict()PipelineRunner.retrain()PipelineRunner.run()PipelineRunner.runs_dir
init_global_random_state()
Module contents
Pipeline module for nirs4all package.
This module contains pipeline classes for processing workflows.
- Phase 5 Additions:
TraceBasedExtractor: Extracts minimal pipeline from execution trace
MinimalPipeline: Minimal pipeline ready for prediction replay
MinimalPipelineStep: A single step in the minimal pipeline
MinimalPredictor: Executes minimal pipeline for efficient prediction
MinimalArtifactProvider: Provides artifacts from MinimalPipeline
- Phase 6 Additions:
BundleGenerator: Creates standalone prediction bundles (.n4a, .n4a.py)
BundleLoader: Loads and predicts from exported bundles
BundleFormat: Enumeration of supported bundle formats
BundleMetadata: Bundle metadata structure
- Phase 7 Additions:
Retrainer: Handles retraining with full/transfer/finetune modes
RetrainMode: Enumeration of retrain modes
StepMode: Per-step mode override for fine-grained control
ExtractedPipeline: Extracted pipeline for inspection/modification
RetrainArtifactProvider: Artifact provider respecting retrain modes
- class nirs4all.pipeline.ArtifactProvider[source]
Bases:
ABCAbstract interface for providing artifacts during prediction replay.
The ArtifactProvider enables controller-agnostic artifact injection: controllers request artifacts by step index rather than by name matching, which is deterministic and works with any controller type.
This interface is used during prediction mode to provide pre-loaded artifacts (transformers, models, etc.) to controllers without requiring them to know about the artifact storage system.
- Implementations:
MapArtifactProvider: In-memory dictionary-based provider
LoaderArtifactProvider: Wraps ArtifactLoader for lazy loading
Example
>>> provider = MapArtifactProvider(artifact_map) >>> artifacts = provider.get_artifacts_for_step(step_index=2) >>> for artifact_id, obj in artifacts: ... process(obj)
- abstractmethod get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifact_by_chain(chain_path: str) Any | None[source]
Get artifact by V3 chain path (optional V3 method).
- Parameters:
chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)
- Returns:
Artifact object or None if not found
- get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]
Get all artifacts matching a chain path prefix (optional V3 method).
- Parameters:
chain_prefix – Chain path prefix to match
- Returns:
List of (chain_path, artifact_object) tuples
- abstractmethod get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
branch_id – Optional branch ID filter
source_index – Optional source/dataset index filter for multi-source
substep_index – Optional substep index filter for branch substeps
- Returns:
List of (artifact_id, artifact_object) tuples
- abstractmethod get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- get_primary_artifact(step_index: int) Any | None[source]
Get the primary artifact for a step.
The primary artifact is typically the main model or transformer for the step. Default implementation returns the first artifact.
- Parameters:
step_index – 1-based step index
- Returns:
Primary artifact object or None if not found
- class nirs4all.pipeline.BundleFormat(value)[source]
-
Supported bundle export formats.
- N4A
Full ZIP bundle with all artifacts and metadata
- N4A_PY
Portable Python script with embedded artifacts
- N4A = 'n4a'
- N4A_PY = 'n4a.py'
- class nirs4all.pipeline.BundleGenerator(workspace_path: str | Path, verbose: int = 0)[source]
Bases:
objectGenerate standalone prediction bundles from trained pipelines.
This class exports trained pipelines to bundle formats that can be used for deployment, sharing, or archival without requiring the original workspace or full nirs4all installation.
- workspace_path
Path to the workspace root
- resolver
PredictionResolver for resolving prediction sources
- verbose
Verbosity level for logging
Example
>>> generator = BundleGenerator(workspace_path) >>> generator.export(best_prediction, "model.n4a") >>> >>> # Export to portable script >>> generator.export(best_prediction, "model.n4a.py", format="n4a.py")
- export(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str | BundleFormat = BundleFormat.N4A, include_metadata: bool = True, compress: bool = True) Path[source]
Export a prediction source to a bundle.
- Parameters:
source – Prediction source (prediction dict, folder path, etc.)
output_path – Path for the output bundle
format – Bundle format (‘n4a’ or ‘n4a.py’)
include_metadata – Whether to include full metadata in bundle
compress – Whether to compress artifacts (for .n4a format)
- Returns:
Path to the created bundle
- Raises:
ValueError – If format is not supported
FileNotFoundError – If source cannot be resolved
- class nirs4all.pipeline.BundleLoader(bundle_path: str | Path)[source]
Bases:
objectLoad and use prediction bundles.
Provides functionality for loading .n4a bundles, extracting metadata, and running predictions.
- bundle_path
Path to the bundle file
- metadata
Bundle metadata
- trace
Execution trace (if available)
- pipeline_config
Pipeline configuration
- fold_weights
Fold weights for CV ensemble
- artifact_provider
Provider for artifacts
Example
>>> loader = BundleLoader("model.n4a") >>> print(f"Pipeline: {loader.metadata.pipeline_uid}") >>> print(f"Preprocessing: {loader.metadata.preprocessing_chain}") >>> y_pred = loader.predict(X_new)
- get_chain_for_artifact(artifact_key: str) OperatorChain | None[source]
Get the operator chain for an artifact from the bundle.
- Parameters:
artifact_key – Artifact key (e.g., “step_1”, “step_4_fold0”)
- Returns:
OperatorChain for the artifact or None if not found
- get_merged_chains(import_context_chain: OperatorChain, step_offset: int = 0) Dict[str, OperatorChain][source]
Get all artifact chains merged with an import context chain.
Used when importing a bundle into another pipeline. Each artifact’s chain is prefixed with the import context chain.
- Parameters:
import_context_chain – Chain from the importing pipeline context
step_offset – Step offset to apply to bundle steps
- Returns:
Dict mapping artifact keys to merged chains
- get_partitioner_routing(step_index: int | None = None) Dict[str, Any] | None[source]
Get partitioner routing info for a specific step or all steps.
- Parameters:
step_index – Specific step index, or None for all
- Returns:
Routing info dict or None
- get_required_metadata_columns() List[str][source]
Get the metadata columns required for prediction routing.
- Returns:
List of column names needed for routing, empty if no routing needed.
- get_step_info() List[Dict[str, Any]][source]
Get information about steps in the bundle.
- Returns:
List of step info dictionaries
- has_partitioner_routing() bool[source]
Check if the bundle has metadata partitioner routing info.
- Returns:
True if the bundle contains partitioner routing configuration.
- import_artifacts_to_registry(registry: ArtifactRegistry, import_context_chain: OperatorChain | None = None, step_offset: int = 0, new_pipeline_id: str | None = None) Dict[str, str][source]
Import bundle artifacts into an artifact registry.
Registers all artifacts from this bundle into the target registry, optionally merging with an import context chain for proper V3 tracking.
- Parameters:
registry – Target ArtifactRegistry to import into
import_context_chain – Optional chain from import context to prefix
step_offset – Step offset for imported artifacts
new_pipeline_id – New pipeline ID for imported artifacts
- Returns:
Dict mapping original artifact keys to new artifact IDs
- predict(X: ndarray, branch_path: List[int] | None = None) ndarray[source]
Run prediction on input data.
Applies all preprocessing steps and model(s) from the bundle. Supports branching pipelines, meta-models (stacking), and CV ensembles.
- Parameters:
X – Input features as numpy array
branch_path – Optional branch path filter for multi-branch pipelines
- Returns:
Predictions as numpy array
- predict_with_metadata(X: ndarray, metadata: Dict[str, ndarray], fallback_branch: int | None = None) ndarray[source]
Run prediction with metadata-based sample routing.
For bundles with metadata partitioner branches, this method routes each sample to the appropriate branch based on its metadata value. Each sample is processed by the transformers and models from its matching branch.
- Parameters:
X – Input features as numpy array (n_samples, n_features)
metadata – Dict mapping column names to value arrays. Must include the column used for partitioning during training.
fallback_branch – Optional branch ID to use for samples with unknown metadata values. If None, raises error for unknowns.
- Returns:
Predictions as numpy array
- Raises:
ValueError – If required metadata column is missing or samples have unknown values without fallback.
Example
>>> loader = BundleLoader("model.n4a") >>> X_new = np.random.randn(100, 500) >>> metadata = {"site": np.array(["A"]*50 + ["B"]*50)} >>> y_pred = loader.predict_with_metadata(X_new, metadata)
- class nirs4all.pipeline.BundleMetadata(bundle_format_version: str = '1.0', nirs4all_version: str = '', created_at: str = '', pipeline_uid: str = '', source_type: str = '', model_step_index: int | None = None, fold_strategy: str = 'weighted_average', preprocessing_chain: str = '', trace_id: str | None = None, original_manifest: ~typing.Dict[str, ~typing.Any] = <factory>, partitioner_routing: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectMetadata for a prediction bundle.
Contains information about the bundle format, source, and contents.
- class nirs4all.pipeline.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: ~typing.Dict[str, ~typing.Any] = <factory>, execution_mode: ~nirs4all.pipeline.trace.execution_trace.StepExecutionMode = StepExecutionMode.TRAIN, artifacts: ~nirs4all.pipeline.trace.execution_trace.StepArtifacts = <factory>, branch_path: ~typing.List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, input_chain_path: str = '', output_chain_paths: ~typing.List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: ~typing.Tuple[int, int] | None = None, output_shape: ~typing.Tuple[int, int] | None = None, input_features_shape: ~typing.List[~typing.Tuple[int, int, int]] | None = None, output_features_shape: ~typing.List[~typing.Tuple[int, int, int]] | None = None)[source]
Bases:
objectRecord of a single step’s execution in the trace (V3).
Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.
V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator
- execution_mode
How the step was executed (train/predict/skip)
- artifacts
Artifacts produced by this step
- # V3 chain tracking
- add_output_chain(chain_path: str) None[source]
Add an output chain path to this step.
- Parameters:
chain_path – Operator chain path to add
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'train'
- classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]
Create ExecutionStep from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionStep instance
- class nirs4all.pipeline.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: ~typing.List[~nirs4all.pipeline.trace.execution_trace.ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: ~typing.Dict[int, float] | None = None, preprocessing_chain: str = '', metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectComplete trace of a pipeline execution path.
Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.
The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.
- steps
Ordered list of execution steps
- fold_weights
Per-fold weights for CV ensemble (None for single model)
- add_step(step: ExecutionStep) None[source]
Add a step to the trace.
- Parameters:
step – ExecutionStep to add
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]
Finalize the trace with summary information.
Call this after all steps have been recorded to add summary info.
- Parameters:
preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)
metadata – Additional metadata to merge
- classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]
Create ExecutionTrace from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionTrace instance
- get_artifact_ids() List[str][source]
Get all artifact IDs in this trace.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_by_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if step not found
- get_fold_artifact_ids() Dict[int, str][source]
Get per-fold model artifact IDs.
- Returns:
Dictionary of fold_id -> artifact_id
- get_model_artifact_id() str | None[source]
Get the primary model artifact ID.
- Returns:
Model artifact ID or None if no model step
- get_step(step_index: int) ExecutionStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index to find
- Returns:
ExecutionStep or None if not found
- get_steps_before(step_index: int) List[ExecutionStep][source]
Get all steps before a given step index.
- Parameters:
step_index – 1-based step index (exclusive)
- Returns:
List of steps with step_index < given index
- get_steps_up_to_model() List[ExecutionStep][source]
Get all steps up to and including the model step.
- Returns:
List of steps needed to reproduce the prediction
- set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]
Set the model step index and optional fold weights.
- Parameters:
step_index – Index of the model step
fold_weights – Optional per-fold weights for CV
- steps: List[ExecutionStep]
- class nirs4all.pipeline.Explainer(runner: PipelineRunner)[source]
Bases:
objectHandles SHAP explanation generation for trained models.
This class manages the explanation workflow: loading saved models, replaying pipelines to capture the trained model, and generating SHAP explanations with visualizations.
- runner
Parent PipelineRunner instance
- saver
File saver for managing outputs
- manifest_manager
Manager for pipeline manifests
- pipeline_uid
Unique identifier for the pipeline
- artifact_loader
Loader for trained model artifacts
- config_path
Path to the pipeline configuration
- target_model
Metadata for the target model
- captured_model
Tuple of (model, controller) captured during replay
- capture_model(model: Any, controller: Any)[source]
Capture a model during pipeline execution for SHAP analysis.
This method is called by the model controller during explain mode to capture the trained model instance.
- Parameters:
model – Trained model instance
controller – Controller that trained the model
- explain(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'explain_dataset', shap_params: Dict[str, Any] | None = None, verbose: int = 0, plots_visible: bool = True) Tuple[Dict[str, Any], str][source]
Generate SHAP explanations for a saved model.
- Parameters:
prediction_obj – Model identifier (dict with config_path or prediction ID)
dataset – Dataset to explain on
dataset_name – Name for the dataset
shap_params – SHAP configuration parameters
verbose – Verbosity level
plots_visible – Whether to display plots interactively
- Returns:
Tuple of (shap_results_dict, output_directory_path)
Example
>>> explainer = Explainer(runner) >>> shap_results, out_dir = explainer.explain( ... {"config_path": "0001_abc123"}, ... X_test, ... shap_params={"n_samples": 200, "visualizations": ["spectral", "summary"]} ... )
- class nirs4all.pipeline.ExtractedPipeline(steps: ~typing.List[~typing.Any] = <factory>, trace: ~nirs4all.pipeline.trace.execution_trace.ExecutionTrace | None = None, artifact_provider: ~nirs4all.pipeline.config.context.ArtifactProvider | None = None, model_step_index: int | None = None, preprocessing_chain: str = '', source_pipeline_uid: str = '', metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectExtracted pipeline for inspection and modification.
Represents a pipeline extracted from a trained prediction, ready for inspection, modification, or re-execution.
- steps
List of pipeline steps (can be modified)
- Type:
List[Any]
- trace
Original execution trace (read-only)
- artifact_provider
Provider for original artifacts
- Type:
- artifact_provider: ArtifactProvider | None = None
- get_step(index: int) Any[source]
Get a step by 0-based index.
- Parameters:
index – 0-based step index
- Returns:
Step configuration
- set_model(model: Any) None[source]
Replace the model in the model step.
- Parameters:
model – New model to use
- set_step(index: int, step: Any) None[source]
Set a step by 0-based index.
- Parameters:
index – 0-based step index
step – New step configuration
- trace: ExecutionTrace | None = None
- class nirs4all.pipeline.FoldStrategy(value)[source]
-
Strategy for combining fold predictions in CV ensembles.
- AVERAGE
Simple average of fold predictions
- WEIGHTED_AVERAGE
Weighted average using fold weights
- SINGLE
Use a single fold’s prediction
- AVERAGE = 'average'
- SINGLE = 'single'
- WEIGHTED_AVERAGE = 'weighted_average'
- class nirs4all.pipeline.LoaderArtifactProvider(loader: Any, trace: Any | None = None)[source]
Bases:
ArtifactProviderArtifact provider backed by an ArtifactLoader.
Wraps an ArtifactLoader to provide artifacts on-demand with lazy loading and caching. Used when loading from a manifest for prediction.
- loader
The underlying ArtifactLoader
- trace
Optional ExecutionTrace for step-to-artifact mapping
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
If trace is available, uses trace to find artifact IDs. Otherwise, uses loader’s step-based lookup.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifact_by_chain(chain_path: str) Any | None[source]
Get artifact by V3 chain path.
- Parameters:
chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)
- Returns:
Artifact object or None if not found
- get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]
Get all artifacts matching a chain path prefix.
- Parameters:
chain_prefix – Chain path prefix to match
- Returns:
List of (chain_path, artifact_object) tuples
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
branch_id – Optional branch ID filter
source_index – Optional source index filter for multi-source pipelines
substep_index – Optional substep index filter (not used in loader provider)
- Returns:
List of (artifact_id, artifact_object) tuples
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- class nirs4all.pipeline.MapArtifactProvider(artifact_map: Dict[int, List[Tuple[str, Any]]], fold_weights: Dict[int, float] | None = None, primary_artifacts: Dict[int, str] | None = None)[source]
Bases:
ArtifactProviderIn-memory artifact provider backed by a dictionary.
Provides artifacts from a pre-loaded dictionary mapping step indices to artifacts. Used when artifacts are resolved from an ExecutionTrace or when loading from a bundle.
- artifact_map
Dictionary mapping step_index to list of (artifact_id, object) tuples
- fold_weights
Optional fold weights for CV ensemble averaging
Example
>>> artifact_map = { ... 1: [("0001:1:all", snv_transformer)], ... 2: [("0001:2:0", model_fold0), ("0001:2:1", model_fold1)] ... } >>> provider = MapArtifactProvider(artifact_map) >>> transformer = provider.get_artifact(step_index=1)
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
If fold_id is specified, returns the fold-specific artifact. Otherwise, returns the primary or first artifact.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter (not used in map provider)
branch_id – Optional branch ID filter (not used in map provider)
source_index – Optional source/dataset index filter (not used in map provider)
substep_index – Optional substep index filter (not used in map provider)
- Returns:
List of (artifact_id, artifact_object) tuples
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter (not used in map provider)
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- class nirs4all.pipeline.MinimalArtifactProvider(minimal_pipeline: MinimalPipeline, artifact_loader: Any, target_sub_index: int | None = None, target_model_name: str | None = None)[source]
Bases:
ArtifactProviderArtifact provider backed by a MinimalPipeline (V3).
Provides artifacts from the minimal pipeline’s artifact map, which contains StepArtifacts extracted from the execution trace.
This provider uses V3 ArtifactRecord metadata (chain_path, branch_path, substep_index) instead of parsing V2-style artifact IDs.
- minimal_pipeline
The source MinimalPipeline
- artifact_loader
ArtifactLoader for loading actual artifact objects
- target_sub_index
Filter artifacts by substep_index
- target_model_name
Filter artifacts by custom_name
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step (V3).
Filters artifacts by branch using the branch_path from ArtifactRecord. This is critical for multisource + branching reload, where branch substep artifacts are lumped together in the execution trace but can be distinguished by their artifact records.
Returns tuples of (operator_name, artifact_object) where operator_name is derived from the object class and substep_index (e.g., “MinMaxScaler_1”). This allows transformer controllers to look up artifacts by name.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter (e.g., [0] for branch 0)
branch_id – Optional branch ID filter (used when branch_path not available)
source_index – Optional source/dataset index filter for multi-source
substep_index – Optional substep index filter for branch substeps
- Returns:
List of (operator_name, artifact_object) tuples
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
Filters by target_sub_index when set (for subpipelines with multiple models). When target_sub_index is set, looks through all artifact_ids instead of fold_artifact_ids because fold_artifact_ids only stores the last model’s artifacts when multiple models exist in a subpipeline.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- class nirs4all.pipeline.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: ~typing.List[~nirs4all.pipeline.trace.extractor.MinimalPipelineStep] = <factory>, artifact_map: ~typing.Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts] = <factory>, model_step_index: int | None = None, fold_weights: ~typing.Dict[int, float] | None = None, preprocessing_chain: str = '', metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectMinimal pipeline extracted from an execution trace.
Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.
- steps
Ordered list of minimal steps to execute
- artifact_map
Mapping of step_index to list of (artifact_id, step_artifacts)
- Type:
Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]
- artifact_map: Dict[int, StepArtifacts]
- get_all_chain_paths() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path across all steps.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs in the minimal pipeline.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_for_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if not found
- get_step(step_index: int) MinimalPipelineStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index
- Returns:
MinimalPipelineStep or None if not found
- get_step_count() int[source]
Get the number of steps in the minimal pipeline.
- Returns:
Number of steps
- get_step_indices() List[int][source]
Get all step indices in execution order.
- Returns:
List of step indices
- has_step(step_index: int) bool[source]
Check if a step is included in the minimal pipeline.
- Parameters:
step_index – 1-based step index
- Returns:
True if step is included
- steps: List[MinimalPipelineStep]
- class nirs4all.pipeline.MinimalPipelineStep(step_index: int, step_config: ~typing.Any = None, execution_mode: ~nirs4all.pipeline.trace.execution_trace.StepExecutionMode = StepExecutionMode.PREDICT, artifacts: ~nirs4all.pipeline.trace.execution_trace.StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: ~typing.List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: ~typing.Set[int] = <factory>)[source]
Bases:
objectA step in the minimal pipeline for prediction replay.
Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.
- step_config
The pipeline step configuration (dict or object)
- Type:
Any
- execution_mode
How to execute this step (train/predict/skip)
- artifacts
Artifacts for this step (from trace)
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'predict'
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs for this step.
- Returns:
List of artifact IDs
- get_artifacts_by_chain() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- class nirs4all.pipeline.MinimalPredictor(artifact_loader: Any, run_dir: str | Path, saver: Any = None, manifest_manager: Any = None, verbose: int = 0)[source]
Bases:
objectExecute minimal pipeline for prediction.
This class takes a MinimalPipeline (extracted from an ExecutionTrace) and executes only the required steps using existing controllers with artifact injection.
The MinimalPredictor achieves the Phase 5 goal of “execute only needed steps” by: 1. Using the minimal pipeline’s step list (not the full original pipeline) 2. Injecting pre-loaded artifacts via ArtifactProvider 3. Running controllers in predict mode
- artifact_loader
ArtifactLoader for loading artifacts
- run_dir
Path to run directory
- saver
Optional SimulationSaver for outputs
- manifest_manager
Optional ManifestManager
- verbose
Verbosity level
Example
>>> predictor = MinimalPredictor(artifact_loader, run_dir) >>> y_pred, predictions = predictor.predict(minimal_pipeline, dataset)
- predict(minimal_pipeline: MinimalPipeline, dataset: SpectroDataset, target_model: Dict[str, Any] | None = None) Tuple[ndarray, Predictions][source]
Execute minimal pipeline and return predictions.
Runs only the steps in the minimal pipeline, using pre-loaded artifacts from the execution trace.
- Parameters:
minimal_pipeline – MinimalPipeline to execute
dataset – Dataset to predict on
target_model – Optional target model metadata for filtering
- Returns:
Tuple of (y_pred array, Predictions object)
- predict_with_fold_ensemble(minimal_pipeline: MinimalPipeline, dataset: SpectroDataset, fold_strategy: str = 'weighted_average') Tuple[ndarray, Predictions][source]
Execute minimal pipeline with fold ensemble averaging.
For cross-validation models, runs prediction with each fold model and combines results according to fold_strategy.
- Parameters:
minimal_pipeline – MinimalPipeline to execute
dataset – Dataset to predict on
fold_strategy – How to combine folds (“average”, “weighted_average”)
- Returns:
Tuple of (y_pred array, Predictions object)
- validate_minimal_pipeline(minimal_pipeline: MinimalPipeline) Tuple[bool, List[str]][source]
Validate that minimal pipeline can be executed.
Checks that: - All step configs are present - All required artifacts are loadable - Model step is included
- Parameters:
minimal_pipeline – MinimalPipeline to validate
- Returns:
Tuple of (is_valid, list of issues)
- class nirs4all.pipeline.PipelineConfigs(definition: Dict | List[Any] | str, name: str = '', description: str = 'No description provided', max_generation_count: int = 10000)[source]
Bases:
objectClass to hold the configuration for a pipeline.
- static get_hash(steps) str[source]
Generate a hash for the pipeline configuration.
All objects are fully JSON-serializable (no _runtime_instance). No need for default=str hack anymore.
- class nirs4all.pipeline.PipelineLibrary(workspace_path: Path)[source]
Bases:
objectManages reusable pipeline templates in the workspace library.
Templates are stored in: workspace/library/{category}/{template_name}/ Each template contains: - pipeline.json: The pipeline configuration - metadata.json: Description, tags, performance metrics, etc. - README.md: Human-readable documentation
- copy_from_pipeline(pipeline_dir: Path, name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, extract_metrics: bool = True) Path[source]
Copy a successful pipeline to the library as a template.
- Parameters:
pipeline_dir – Path to pipeline directory (e.g., workspace/runs/…/0001_hash/)
name – Template name
category – Category for the template
description – Description of the template
tags – Tags for filtering
extract_metrics – Whether to extract metrics from manifest
- Returns:
Path to saved template
- delete_template(name: str, category: str | None = None) None[source]
Delete a template from the library.
- Parameters:
name – Template name
category – Optional category to search in
- Raises:
FileNotFoundError – If template not found
- export_template(name: str, export_path: str | Path, category: str | None = None) Path[source]
Export a template to a standalone directory.
- Parameters:
name – Template name
export_path – Destination directory
category – Optional category to search in
- Returns:
Path to exported template
- get_template_metadata(name: str, category: str | None = None) Dict[str, Any][source]
Get metadata for a template.
- Parameters:
name – Template name
category – Optional category to search in
- Returns:
Metadata dictionary
- import_template(import_path: str | Path, category: str = 'general', overwrite: bool = False) Path[source]
Import a template from an external directory.
- Parameters:
import_path – Path to template directory
category – Category to import into
overwrite – Whether to overwrite existing template
- Returns:
Path to imported template in library
- list_templates(category: str | None = None, tags: List[str] | None = None) List[Dict[str, Any]][source]
List all templates, optionally filtered by category and tags.
- Parameters:
category – Optional category filter
tags – Optional list of tags to filter by (matches any)
- Returns:
List of template metadata dictionaries
- load_template(name: str, category: str | None = None) Dict[str, Any][source]
Load a pipeline template by name.
- Parameters:
name – Template name
category – Optional category to search in (searches all if None)
- Returns:
Pipeline configuration dictionary
- Raises:
FileNotFoundError – If template not found
- save_template(pipeline_config: Dict[str, Any], name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, metrics: Dict[str, float] | None = None, notes: str = '', overwrite: bool = False) Path[source]
Save a pipeline configuration as a reusable template.
- Parameters:
pipeline_config – Pipeline configuration dictionary
name – Template name (will be sanitized for filesystem)
category – Category/folder (e.g., “preprocessing”, “modeling”, “full_pipeline”)
description – Short description of the template
tags – List of tags for filtering (e.g., [“classification”, “neural_network”])
metrics – Performance metrics (e.g., {“accuracy”: 0.95, “f1”: 0.93})
notes – Additional notes or usage instructions
overwrite – Whether to overwrite existing template
- Returns:
Path to saved template directory
- Raises:
FileExistsError – If template exists and overwrite=False
ValueError – If name contains invalid characters
- class nirs4all.pipeline.PipelineRunner(workspace_path: str | Path | None = None, verbose: int = 0, mode: str = 'train', save_artifacts: bool = True, save_charts: bool = True, enable_tab_reports: bool = True, continue_on_error: bool = False, show_spinner: bool = True, keep_datasets: bool = True, plots_visible: bool = False, random_state: int | None = None, log_file: bool = True, log_format: str = 'pretty', use_unicode: bool | None = None, use_colors: bool | None = None, show_progress_bar: bool = True, json_output: bool = False)[source]
Bases:
objectMain pipeline execution interface.
Orchestrates pipeline execution on datasets, providing a simplified interface for training, prediction, and explanation workflows. Delegates actual execution to PipelineOrchestrator, Predictor, and Explainer.
- workspace_path
Root workspace directory
- Type:
Path
- orchestrator
Underlying orchestrator for execution
- Type:
Example
>>> # Training workflow >>> runner = PipelineRunner(workspace_path="./workspace", verbose=1) >>> pipeline = [{"preprocessing": StandardScaler()}, {"model": SVC()}] >>> X, y = load_data() >>> predictions, dataset_preds = runner.run(pipeline, (X, y))
>>> # Prediction workflow >>> runner = PipelineRunner(mode="predict") >>> y_pred, preds = runner.predict(best_model, X_new)
>>> # Explanation workflow >>> runner = PipelineRunner(mode="explain") >>> shap_results, out_dir = runner.explain(best_model, X_test)
- property current_run_dir: Path | None
Get current run directory.
- Returns:
Path to current run directory, or None if not set
- explain(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'explain_dataset', shap_params: Dict[str, Any] | None = None, verbose: int = 0, plots_visible: bool = True) Tuple[Dict[str, Any], str][source]
Generate SHAP explanations for a saved model.
Delegates to Explainer class for actual execution.
- Parameters:
prediction_obj – Model identifier (dict with config_path or prediction ID)
dataset – Dataset to explain on
dataset_name – Name for the dataset
shap_params – SHAP configuration parameters
verbose – Verbosity level
plots_visible – Whether to display plots interactively
- Returns:
Tuple of (shap_results_dict, output_directory_path)
- export(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str = 'n4a', include_metadata: bool = True, compress: bool = True) Path[source]
Export a trained pipeline to a standalone bundle.
Creates a self-contained prediction bundle that can be used for deployment, sharing, or archival without requiring the original workspace or full nirs4all installation.
- Supported formats:
‘n4a’: Full bundle (ZIP archive with artifacts and metadata)
‘n4a.py’: Portable Python script with embedded artifacts
- Phase 6 Feature:
This method enables exporting trained pipelines as standalone bundles that can be loaded and used for prediction without the original workspace structure.
- Parameters:
source – Prediction source to export. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run
output_path – Path for the output bundle file
format – Bundle format (‘n4a’ or ‘n4a.py’)
include_metadata – Whether to include full metadata in bundle
compress – Whether to compress artifacts (for .n4a format)
- Returns:
Path to the created bundle file
- Raises:
ValueError – If format is not supported
FileNotFoundError – If source cannot be resolved
Example
>>> runner = PipelineRunner() >>> predictions, _ = runner.run(pipeline, dataset) >>> best_pred = predictions.top(n=1)[0] >>> >>> # Export to .n4a bundle >>> runner.export(best_pred, "exports/wheat_model.n4a") >>> >>> # Export to portable Python script >>> runner.export(best_pred, "exports/wheat_model.n4a.py", format='n4a.py') >>> >>> # Later, predict from bundle >>> y_pred, _ = runner.predict("exports/wheat_model.n4a", X_new)
- export_best_for_dataset(dataset_name: str, mode: str = 'predictions') Path | None[source]
Export best results for a dataset to exports/ folder.
- Parameters:
dataset_name – Name of the dataset to export
mode – Export mode (‘predictions’ or other)
- Returns:
Path to exported file, or None if export failed
- export_model(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str | None = None, fold: int | None = None) Path[source]
Export only the model artifact from a trained pipeline.
Unlike export() which creates a full bundle with all preprocessing artifacts and metadata, this method exports just the model binary. This is useful when you want a lightweight model file that can be loaded directly into other pipelines or used with external tools.
The output format is determined by the file extension or can be specified explicitly. The model can then be reloaded using: - Direct path in pipeline config: {“model”: “path/to/model.joblib”} - As prediction source: runner.predict(“path/to/model.joblib”, data)
- Parameters:
source – Prediction source to export from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - bundle path: Path to a .n4a bundle
output_path – Path for the output model file. Extension determines format: .joblib, .pkl, .h5, .keras, .pt
format – Optional explicit format (‘joblib’, ‘pickle’, ‘keras_h5’). If None, determined from output_path extension.
fold – Optional fold index to export. If None, exports fold 0 or the primary model artifact.
- Returns:
Path to the created model file
- Raises:
ValueError – If no model artifact found
FileNotFoundError – If source cannot be resolved
Example
>>> runner = PipelineRunner() >>> predictions, _ = runner.run(pipeline, dataset) >>> best_pred = predictions.top(n=1)[0] >>> >>> # Export just the model >>> runner.export_model(best_pred, "exports/pls_model.joblib") >>> >>> # Later, use in new pipeline >>> new_pipeline = [ ... MinMaxScaler(), ... {"model": "exports/pls_model.joblib", "name": "pretrained"} ... ]
- extract(source: Dict[str, Any] | str | Path) ExtractedPipeline[source]
Extract a trained pipeline for inspection or modification.
Loads a trained pipeline from a prediction source and returns an ExtractedPipeline object that can be inspected, modified, and then executed with runner.run().
- Phase 7 Feature:
This method enables extracting and modifying trained pipelines without retraining from scratch.
- Parameters:
source – Prediction source to extract. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)
- Returns:
steps: List of pipeline steps (can be modified)
trace: Original execution trace (read-only)
artifact_provider: Provider for original artifacts
model_step_index: Index of the model step
preprocessing_chain: Summary of preprocessing
- Return type:
ExtractedPipeline object with
Example
>>> runner = PipelineRunner() >>> predictions, _ = runner.run(pipeline, dataset) >>> best_pred = predictions.top(n=1)[0] >>> >>> # Extract for inspection >>> extracted = runner.extract(best_pred) >>> print(f"Steps: {len(extracted.steps)}") >>> print(f"Preprocessing: {extracted.preprocessing_chain}") >>> >>> # Modify and run >>> from sklearn.ensemble import RandomForestRegressor >>> extracted.set_model(RandomForestRegressor()) >>> new_preds, _ = runner.run(extracted.steps, new_data)
- property last_aggregate: str | None
Get aggregate column from the last executed dataset.
Returns the aggregation setting from the last dataset processed by run(). This can be used to create a PredictionAnalyzer with matching defaults.
- Returns:
Aggregate column name (‘y’ for y-based aggregation, column name for metadata-based aggregation, or None if no aggregation was set).
Example
>>> runner = PipelineRunner() >>> predictions, _ = runner.run(pipeline, DatasetConfigs(path, aggregate='sample_id')) >>> # Create analyzer with same aggregate setting >>> analyzer = PredictionAnalyzer(predictions, default_aggregate=runner.last_aggregate)
- property last_aggregate_exclude_outliers: bool
Get aggregate exclude_outliers setting from the last executed dataset.
- Returns:
True if T² outlier exclusion was enabled, False otherwise.
- property last_aggregate_method: str | None
Get aggregate method from the last executed dataset.
- Returns:
Aggregate method (‘mean’, ‘median’, ‘vote’) or None for default.
- property library: PipelineLibrary
Get pipeline library for template management.
- Returns:
PipelineLibrary instance for managing pipeline templates
- next_op() int[source]
Get the next operation ID (for controller compatibility).
- Returns:
Next operation counter value
- predict(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'prediction_dataset', all_predictions: bool = False, verbose: int = 0) Tuple[ndarray, Predictions] | Tuple[Dict[str, Any], Predictions][source]
Run prediction using a saved model on new dataset.
Delegates to Predictor class for actual execution.
- Parameters:
prediction_obj – Model identifier (dict with config_path or prediction ID)
dataset – New dataset to predict on
dataset_name – Name for the dataset
all_predictions – If True, return all predictions; if False, return single best
verbose – Verbosity level
- Returns:
(y_pred, predictions) If all_predictions=True: (predictions_dict, predictions)
- Return type:
If all_predictions=False
- retrain(source: Dict[str, Any] | str | Path, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], mode: str = 'full', dataset_name: str = 'retrain_dataset', new_model: Any | None = None, epochs: int | None = None, step_modes: List[StepMode] | None = None, verbose: int = 0, **kwargs) Tuple[Predictions, Dict[str, Any]][source]
Retrain a pipeline on new data.
Enables retraining trained pipelines with various modes: - full: Train from scratch with same pipeline structure - transfer: Use existing preprocessing artifacts, train new model - finetune: Continue training existing model with new data
- Phase 7 Feature:
This method enables retraining pipelines without having to reconstruct the pipeline configuration manually. It uses the resolved prediction source (from Phase 3/4) to extract the pipeline structure and optionally reuse preprocessing artifacts.
- Parameters:
source – Prediction source to retrain from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)
dataset – New dataset to train on. Supports same formats as run()
mode – Retrain mode: - ‘full’: Train everything from scratch (same pipeline structure) - ‘transfer’: Use existing preprocessing, train new model - ‘finetune’: Continue training existing model
dataset_name – Name for the dataset if array-based
new_model – Optional new model for transfer mode (replaces original)
epochs – Optional epochs for fine-tuning
step_modes – Optional per-step mode overrides for fine-grained control
verbose – Verbosity level
**kwargs – Additional parameters: - learning_rate: Learning rate for fine-tuning - freeze_layers: List of layers to freeze during fine-tuning
- Returns:
Tuple of (run_predictions, datasets_predictions)
- Raises:
ValueError – If mode is invalid or source cannot be resolved
FileNotFoundError – If source references files that don’t exist
Example
>>> runner = PipelineRunner() >>> predictions, _ = runner.run(pipeline, dataset) >>> best_pred = predictions.top(n=1)[0] >>> >>> # Full retrain on new data >>> new_preds, _ = runner.retrain(best_pred, new_data, mode='full') >>> >>> # Transfer: use preprocessing from old model, train new one >>> new_preds, _ = runner.retrain( ... best_pred, new_data, mode='transfer', ... new_model=XGBRegressor() ... ) >>> >>> # Finetune: continue training existing model >>> new_preds, _ = runner.retrain( ... best_pred, new_data, mode='finetune', epochs=10 ... ) >>> >>> # Fine-grained control: specify per-step modes >>> from nirs4all.pipeline import StepMode >>> step_modes = [ ... StepMode(step_index=1, mode='predict'), # Use existing ... StepMode(step_index=2, mode='train'), # Retrain ... ] >>> new_preds, _ = runner.retrain( ... best_pred, new_data, mode='full', step_modes=step_modes ... )
- run(pipeline: PipelineConfigs | List[Any] | Dict | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], pipeline_name: str = '', dataset_name: str = 'dataset', max_generation_count: int = 10000) Tuple[Predictions, Dict[str, Any]][source]
Execute pipeline on dataset(s).
Main entry point for training workflows. Executes one or more pipeline configurations on one or more datasets, tracking predictions and artifacts.
- Parameters:
pipeline – Pipeline definition (PipelineConfigs, list of steps, dict, or path)
dataset – Dataset definition (see DatasetConfigs for supported formats)
pipeline_name – Optional pipeline name for identification
dataset_name – Name for array-based datasets
max_generation_count – Max pipeline combinations to generate
- Returns:
Tuple of (run_predictions, datasets_predictions)
- class nirs4all.pipeline.PipelineWriter(pipeline_dir: Path, save_charts: bool = True)[source]
Bases:
objectWrites files within a pipeline directory.
Focused responsibility: File I/O operations for a single pipeline.
- list_files() list[str][source]
List all files in the pipeline directory.
- Returns:
List of filenames
- save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]
Save a text file to the pipeline directory.
- Parameters:
filename – Name of file to create
content – Text content to write
overwrite – Whether to overwrite existing files
encoding – Text encoding (default: utf-8)
warn_on_overwrite – Whether to warn when overwriting
- Returns:
Path to saved file
- Raises:
FileExistsError – If file exists and overwrite=False
- save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]
Save data as JSON file.
- Parameters:
filename – Name of file (will add .json if missing)
data – Data to serialize as JSON
overwrite – Whether to overwrite existing files
indent – JSON indentation (None for compact)
- Returns:
Path to saved file
- save_output(name: str, data: bytes | str, extension: str = '.png') Path | None[source]
Save a human-readable output file (chart, report, etc.).
- Parameters:
name – Output name (e.g., “2D_Chart”)
data – Binary or text data to save
extension – File extension (e.g., “.png”, “.csv”, “.txt”)
- Returns:
Path to saved file, or None if save_charts=False
- class nirs4all.pipeline.PredictionResolver(workspace_path: str | Path, runs_dir: str | Path | None = None)[source]
Bases:
objectResolves any prediction source to executable components.
This class provides a unified interface for resolving prediction sources to the components needed for prediction replay, regardless of the source type (dict, folder, run, artifact_id, bundle).
The resolver is designed to be controller-agnostic: it doesn’t know about specific controller types, but provides artifacts and trace that any controller can use.
- workspace_path
Root workspace directory
- runs_dir
Directory containing run outputs
Example
>>> resolver = PredictionResolver(workspace_path) >>> resolved = resolver.resolve(best_prediction) >>> # Execute using resolved components >>> provider = resolved.artifact_provider >>> artifacts = provider.get_artifacts_for_step(step_index=1)
- resolve(source: Dict[str, Any] | str | Path | Any, verbose: int = 0) ResolvedPrediction[source]
Resolve any prediction source to executable components.
Detects the source type and delegates to the appropriate resolver.
- Parameters:
source – Prediction source (dict, folder path, Run, artifact_id, bundle)
verbose – Verbosity level for logging
- Returns:
ResolvedPrediction with all components for replay
- Raises:
ValueError – If source type cannot be determined or resolved
FileNotFoundError – If referenced files/directories don’t exist
- class nirs4all.pipeline.Predictor(runner: PipelineRunner, use_minimal_pipeline: bool = True)[source]
Bases:
objectHandles prediction using trained pipelines.
This class manages the prediction workflow: loading saved models, replaying pipeline configurations, and generating predictions on new data.
- Phase 5 Enhancement:
When use_minimal_pipeline=True (default), the predictor will: 1. Check if an execution trace is available for the prediction 2. Extract the minimal pipeline (only required steps) from the trace 3. Execute only those steps, significantly reducing prediction time
This is especially beneficial for complex pipelines with multiple preprocessing options, branches, or steps that aren’t needed for the specific model being predicted.
- runner
Parent PipelineRunner instance
- saver
File saver for managing outputs
- manifest_manager
Manager for pipeline manifests
- pipeline_uid
Unique identifier for the pipeline
- artifact_loader
Loader for trained model artifacts
- config_path
Path to the pipeline configuration
- target_model
Metadata for the target model
- use_minimal_pipeline
Whether to use minimal pipeline execution (Phase 5)
- predict(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'prediction_dataset', all_predictions: bool = False, verbose: int = 0) Tuple[ndarray, Predictions] | Tuple[Dict[str, Any], Predictions][source]
Run prediction using a saved model on new dataset.
- Phase 5 Enhancement:
When use_minimal_pipeline=True and an execution trace is available, this method will use TraceBasedExtractor to extract and execute only the required steps, improving prediction speed.
- Parameters:
prediction_obj – Model identifier (dict with config_path or prediction ID)
dataset – New dataset to predict on
dataset_name – Name for the dataset
all_predictions – If True, return all predictions; if False, return single best
verbose – Verbosity level
- Returns:
(y_pred, predictions) If all_predictions=True: (predictions_dict, predictions)
- Return type:
If all_predictions=False
Example
>>> predictor = Predictor(runner) >>> y_pred, preds = predictor.predict( ... {"config_path": "0001_abc123"}, ... X_new ... )
- class nirs4all.pipeline.ResolvedPrediction(source_type: ~nirs4all.pipeline.resolver.SourceType = SourceType.UNKNOWN, minimal_pipeline: ~typing.List[~typing.Any] = <factory>, artifact_provider: ~nirs4all.pipeline.config.context.ArtifactProvider | None = None, trace: ~nirs4all.pipeline.trace.execution_trace.ExecutionTrace | None = None, fold_strategy: ~nirs4all.pipeline.resolver.FoldStrategy = FoldStrategy.WEIGHTED_AVERAGE, fold_weights: ~typing.Dict[int, float] = <factory>, model_step_index: int | None = None, target_model: ~typing.Dict[str, ~typing.Any] = <factory>, pipeline_uid: str = '', run_dir: ~pathlib.Path | None = None, manifest: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectNormalized prediction source ready for execution.
Contains all components needed to replay a prediction: - minimal_pipeline: Subset of steps needed for this prediction - artifact_provider: Provider for artifacts by step index - trace: Execution trace for deterministic replay - fold_strategy: How to combine fold predictions (for CV) - fold_weights: Per-fold weights (for weighted average)
- source_type
Type of the original source
- minimal_pipeline
List of pipeline steps needed for replay
- Type:
List[Any]
- artifact_provider
Provider for step artifacts
- Type:
- trace
ExecutionTrace if available
- fold_strategy
Strategy for combining folds
- run_dir
Path to run directory
- Type:
pathlib.Path | None
- artifact_provider: ArtifactProvider | None = None
- fold_strategy: FoldStrategy = 'weighted_average'
- get_preprocessing_chain() str[source]
Get the preprocessing chain summary.
- Returns:
Preprocessing chain string (e.g., “SNV>SG>MinMax”) or empty
- has_fold_artifacts() bool[source]
Check if fold-specific artifacts are available.
- Returns:
True if this is a CV ensemble with multiple folds
- has_trace() bool[source]
Check if execution trace is available.
- Returns:
True if trace is available for deterministic replay
- source_type: SourceType = 'unknown'
- trace: ExecutionTrace | None = None
- class nirs4all.pipeline.RetrainArtifactProvider(base_provider: ArtifactProvider, retrain_config: RetrainConfig, trace: ExecutionTrace | None = None)[source]
Bases:
ArtifactProviderArtifact provider for retraining that respects step modes.
Provides artifacts only for steps that should use existing artifacts (i.e., mode=’predict’), while returning None for steps that should train.
- base_provider
Underlying artifact provider
- retrain_config
Configuration determining which steps use artifacts
- trace
Execution trace for step type detection
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step if applicable.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if step should train
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step if applicable.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (artifact_id, artifact_object) tuples, or empty if should train
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step if applicable.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, or empty if should train
- class nirs4all.pipeline.RetrainConfig(mode: ~nirs4all.pipeline.retrainer.RetrainMode = RetrainMode.FULL, step_modes: ~typing.List[~nirs4all.pipeline.retrainer.StepMode] = <factory>, new_model: ~typing.Any | None = None, epochs: int | None = None, learning_rate: float | None = None, freeze_layers: ~typing.List[str] | None = None, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectConfiguration for retraining operation.
- mode
Overall retrain mode (full, transfer, finetune)
- step_modes
Per-step mode overrides (optional, for fine-grained control)
- Type:
- new_model
Optional new model to use instead of original (for transfer)
- Type:
Any | None
- get_step_mode(step_index: int) StepMode | None[source]
Get mode override for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepMode if override exists, None otherwise
- mode: RetrainMode = 'full'
- class nirs4all.pipeline.RetrainMode(value)[source]
-
Mode of retraining operation.
- FULL
Train everything from scratch (same pipeline structure)
- TRANSFER
Use existing preprocessing artifacts, train new model
- FINETUNE
Continue training existing model with new data
- FINETUNE = 'finetune'
- FULL = 'full'
- TRANSFER = 'transfer'
- class nirs4all.pipeline.Retrainer(runner: PipelineRunner)[source]
Bases:
objectHandles retraining pipelines with various modes.
This class manages the retrain workflow: loading saved pipelines, determining which steps to retrain vs. reuse, and executing the modified pipeline on new data.
- Phase 7 Implementation:
The Retrainer enables three modes: - full: Train from scratch with same pipeline structure - transfer: Use existing preprocessing, train new model - finetune: Continue training existing model
- runner
Parent PipelineRunner instance
- resolver
Prediction resolver for loading sources
- extract(source: Dict[str, Any] | str | Path | Any, verbose: int = 0) ExtractedPipeline[source]
Extract a pipeline for inspection or modification.
Returns an ExtractedPipeline object that can be inspected, modified, and then executed with runner.run().
- Parameters:
source – Prediction source (dict, folder, Run, artifact_id, bundle)
verbose – Verbosity level
- Returns:
ExtractedPipeline for inspection/modification
Example
>>> extracted = retrainer.extract(best_pred) >>> print(extracted.steps) >>> extracted.steps[-1] = {"model": RandomForestRegressor()} >>> preds, _ = runner.run(extracted.steps, new_data)
- retrain(source: Dict[str, Any] | str | Path | Any, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], mode: str | RetrainMode = 'full', dataset_name: str = 'retrain_dataset', new_model: Any | None = None, epochs: int | None = None, step_modes: List[StepMode] | None = None, verbose: int = 0, **kwargs) Tuple[Predictions, Dict[str, Any]][source]
Retrain a pipeline on new data.
- Parameters:
source – Prediction source (dict, folder, Run, artifact_id, bundle)
dataset – New dataset to train on
mode – Retrain mode (‘full’, ‘transfer’, ‘finetune’)
dataset_name – Name for the dataset
new_model – Optional new model for transfer mode
epochs – Optional epochs for fine-tuning
step_modes – Optional per-step mode overrides
verbose – Verbosity level
**kwargs – Additional parameters (learning_rate, freeze_layers, etc.)
- Returns:
Tuple of (predictions, dataset_predictions_dict)
Example
>>> retrainer = Retrainer(runner) >>> >>> # Full retrain >>> preds, _ = retrainer.retrain(best_pred, new_data, mode='full') >>> >>> # Transfer: use preprocessing, new model >>> preds, _ = retrainer.retrain(best_pred, new_data, mode='transfer') >>> >>> # Finetune: continue training >>> preds, _ = retrainer.retrain(best_pred, new_data, mode='finetune', epochs=10)
- class nirs4all.pipeline.SourceType(value)[source]
-
Type of prediction source.
- PREDICTION
Dictionary from Predictions object
- FOLDER
Path to pipeline folder
- RUN
Run object (best prediction from run)
- ARTIFACT_ID
Direct artifact reference string
- BUNDLE
Exported .n4a bundle file
- TRACE_ID
Execution trace reference
- MODEL_FILE
Direct model file (.joblib, .pkl, .h5, .pt, etc.)
- UNKNOWN
Unrecognized source type
- ARTIFACT_ID = 'artifact_id'
- BUNDLE = 'bundle'
- FOLDER = 'folder'
- MODEL_FILE = 'model_file'
- PREDICTION = 'prediction'
- RUN = 'run'
- TRACE_ID = 'trace_id'
- UNKNOWN = 'unknown'
- class nirs4all.pipeline.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectArtifacts produced by a single step (V3).
Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.
- # V3 indexes
- add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]
Add an artifact ID to this step’s artifacts (V3).
- Parameters:
artifact_id – The artifact ID to add
is_primary – Whether this is the primary artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source indexing
- add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]
Add a fold-specific artifact.
- Parameters:
fold_id – CV fold index
artifact_id – Artifact ID for this fold
chain_path – V3 operator chain path
branch_path – Branch path for indexing
- classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]
Create StepArtifacts from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
StepArtifacts instance
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by exact chain path match.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifacts_for_branch(branch_path: List[int]) List[str][source]
Get artifact IDs matching a branch path.
Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)
- Parameters:
branch_path – Target branch path
- Returns:
List of matching artifact IDs
- get_artifacts_for_source(source_index: int) List[str][source]
Get artifact IDs for a specific source.
- Parameters:
source_index – Source index to filter
- Returns:
List of artifact IDs for that source
- merge(other: StepArtifacts) None[source]
Merge another StepArtifacts into this one.
Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.
- Parameters:
other – StepArtifacts to merge into this one
- class nirs4all.pipeline.StepMode(step_index: int, mode: str = 'train', artifact_id: str | None = None, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectMode override for a specific step during retraining.
Enables fine-grained control over which steps train vs. use existing artifacts.
- class nirs4all.pipeline.TargetResolver(workspace_path: Path)[source]
Bases:
objectResolves prediction targets for predict mode.
Focused responsibility: Finding and resolving prediction targets by ID.
Note: For the comprehensive Phase 3 resolver that handles multiple source types (prediction dict, folder, Run, artifact_id, bundle), see nirs4all.pipeline.resolver.PredictionResolver.
- find_best_for_config(config_path: str) Dict[str, Any] | None[source]
Find the best prediction for a given config path.
- Parameters:
config_path – Path to pipeline configuration
- Returns:
Best prediction metadata, or None if not found
- find_prediction_by_id(prediction_id: str) Dict[str, Any] | None[source]
Search for a prediction by ID in global predictions databases.
Uses direct ID filtering for O(1) lookup per file instead of O(N) iteration.
- Parameters:
prediction_id – Unique prediction identifier
- Returns:
Prediction metadata dict, or None if not found
- resolve_target(prediction_obj: Dict[str, Any] | str) tuple[str, Dict[str, Any] | None][source]
Resolve prediction object to config path and model metadata.
- Parameters:
prediction_obj – Either: - Dict with ‘config_path’ and optional model metadata - String: config path or prediction ID
- Returns:
Tuple of (config_path, target_model_metadata)
- Raises:
ValueError – If prediction ID not found or invalid input
- class nirs4all.pipeline.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]
Bases:
objectExtract minimal pipeline from execution trace.
The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.
The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.
- include_skipped
Whether to include skipped steps in minimal pipeline
- preserve_order
Whether to preserve original step order
Example
>>> extractor = TraceBasedExtractor() >>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id) >>> minimal = extractor.extract(trace, full_pipeline_steps) >>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
- extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]
Extract minimal pipeline from execution trace.
Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.
- Parameters:
trace – ExecutionTrace to extract from
full_pipeline – Optional full pipeline steps (for step configs)
up_to_model – If True, only include steps up to model step
- Returns:
MinimalPipeline with steps and artifact mappings
- extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_path – Branch path to extract (e.g., [0] for first branch)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch by name.
More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_name – Branch name to match (e.g., “branch_0_branch_0”)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline up to a specific step.
Useful for partial prediction or when targeting a specific model in a multi-model pipeline.
- Parameters:
trace – ExecutionTrace to extract from
target_step_index – Target step index (inclusive)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps up to target
- get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]
Get list of artifact IDs required for prediction.
Useful for pre-loading artifacts or validating artifact availability.
- Parameters:
trace – ExecutionTrace to analyze
up_to_model – If True, only include artifacts up to model step
- Returns:
List of artifact IDs needed for prediction
- get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]
Build dependency graph from execution trace.
The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.
- Parameters:
trace – ExecutionTrace to analyze
- Returns:
Dictionary mapping step_index to set of dependency step indices
- validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]
Validate that a trace has all information needed for prediction.
Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing
- Parameters:
trace – ExecutionTrace to validate
- Returns:
Tuple of (is_valid, list of issues)
- class nirs4all.pipeline.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]
Bases:
objectRecords execution traces during pipeline execution (V3).
Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.
V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually
- trace
The ExecutionTrace being built
- current_step
The step currently being executed
- step_start_time
Time when current step started (for duration)
- pipeline_id
Pipeline identifier for chain generation
Example
>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123") >>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV") >>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV") >>> recorder.end_step() >>> recorder.enter_branch(0) >>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS") >>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]") >>> recorder.end_step(is_model=True) >>> recorder.exit_branch() >>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
- add_step_metadata(key: str, value: Any) None[source]
Add metadata to the current step.
- Parameters:
key – Metadata key
value – Metadata value
- build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]
Build an operator chain for an artifact.
Creates a chain based on current context plus the specified operator.
- Parameters:
step_index – Step index of the operator
operator_class – Class name of the operator
source_index – Source index for multi-source
fold_id – Fold ID for CV models
substep_index – Substep index within step
- Returns:
OperatorChain for the artifact
- current_branch_path() List[int][source]
Get current branch path.
- Returns:
Copy of current branch path
- current_chain() OperatorChain[source]
Get current operator chain without modifying stack.
- Returns:
Current OperatorChain
- end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]
End the current step and add it to the trace.
- Parameters:
is_model – Whether this is the model step
fold_weights – Per-fold weights for CV models
skip_trace – If True, don’t add this step to the trace
- enter_branch(branch_id: int) List[int][source]
Enter a branch context.
- Parameters:
branch_id – Branch index to enter
- Returns:
New branch path after entering
- exit_branch() List[int][source]
Exit current branch context.
- Returns:
The exited branch path
- Raises:
RuntimeError – If not in a branch context
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]
Finalize and return the completed trace.
- Parameters:
preprocessing_chain – Summary string of preprocessing
metadata – Additional metadata to merge
- Returns:
The completed ExecutionTrace
- get_current_step_index() int | None[source]
Get the current step index.
- Returns:
Current step index or None if no step active
- has_model_step() bool[source]
Check if a model step has been recorded.
- Returns:
True if model step index is set
- mark_step_skipped(step_index: int) None[source]
Record that a step was skipped.
- Parameters:
step_index – Index of the skipped step
- pop_chain() OperatorChain[source]
Pop and return the current chain.
- Returns:
The popped OperatorChain
- Raises:
RuntimeError – If trying to pop the root chain
- push_chain(node: OperatorNode) OperatorChain[source]
Push new node onto the chain stack.
Creates a new chain with the node appended and pushes it.
- Parameters:
node – OperatorNode to append
- Returns:
The new extended chain
- record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]
Record an artifact created during the current step (V3).
- Parameters:
artifact_id – The artifact ID
is_primary – Whether this is the primary artifact
fold_id – CV fold ID if fold-specific artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source
metadata – Additional artifact metadata
- record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record input shapes for the current step.
- Parameters:
input_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record output shapes for the current step.
- Parameters:
output_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- reset_chain_to(chain: OperatorChain) None[source]
Reset chain stack to a specific chain.
Useful when entering a new branch context.
- Parameters:
chain – Chain to reset to
- start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]
Start recording a branch step.
- Parameters:
step_index – Step index of the branch
branch_count – Number of branches
operator_config – Branch configuration
- Returns:
The created ExecutionStep for the branch
- start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]
Start recording a substep within a branch.
Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.
- Parameters:
parent_step_index – Parent branch step index
branch_id – Branch index this substep belongs to (for metadata only)
operator_type – Type of operator
operator_class – Class name of operator
substep_index – Index within the branch’s substeps
operator_config – Operator configuration
branch_name – Human-readable branch name
- Returns:
The created ExecutionStep
- start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]
Start recording a new step (V3).
- Parameters:
step_index – 1-based step index
operator_type – Type of operator (e.g., “transform”, “model”)
operator_class – Class name of operator
operator_config – Serialized operator configuration
execution_mode – Train/predict/skip mode
branch_path – Branch indices (uses current if None)
branch_name – Human-readable branch name
source_count – Number of X sources at this step
produces_branches – Whether this is a branch operator
substep_index – Index within substep
- Returns:
The created ExecutionStep
- class nirs4all.pipeline.WorkspaceExporter(workspace_path: Path)[source]
Bases:
objectExports best results to workspace exports/ folder.
Focused responsibility: Export functionality for best pipelines.
- export_best_for_dataset(dataset_name: str, runs_dir: Path, mode: str = 'predictions') Path | None[source]
Export best results for a dataset to exports/ folder.
- Parameters:
dataset_name – Dataset name
runs_dir – Runs directory path
mode – Export mode - “predictions”, “template”, “trained”, or “full”
- Returns:
Path to export directory, or None if no predictions found
- export_best_prediction(predictions_file: Path, dataset_name: str, run_date: str = None, pipeline_id: str = None, custom_name: str | None = None) Path[source]
Export predictions CSV to best_predictions/ folder.
- Parameters:
predictions_file – Path to predictions.csv
dataset_name – Dataset name
run_date – Run date - deprecated, kept for compatibility
pipeline_id – Pipeline identifier
custom_name – Optional custom name for export
- Returns:
Path to exported CSV
- export_pipeline_full(pipeline_dir: Path, dataset_name: str, run_date: str = None, custom_name: str | None = None) Path[source]
Export full pipeline results to flat structure.
- Parameters:
pipeline_dir – Path to pipeline (NNNN_hash/)
dataset_name – Dataset name
run_date – Run date (YYYYMMDD) - deprecated, kept for compatibility
custom_name – Optional custom name for export
- Returns:
Path to exported directory