"""Custom log handlers for nirs4all logging.
This module provides specialized handlers for throttled progress updates,
file rotation, and other custom logging behaviors.
"""
from __future__ import annotations
import gzip
import logging
import os
import shutil
import time
from datetime import datetime, timedelta
from pathlib import Path
from threading import Lock
from typing import Any, Optional
[docs]
class ThrottledHandler(logging.Handler):
"""Handler that throttles progress messages to avoid flooding.
Uses time-based and percentage-based throttling to limit progress
updates while still reporting important milestones.
"""
# Milestone percentages to always report
MILESTONES = {10, 25, 50, 75, 90, 100}
def __init__(
self,
base_handler: logging.Handler,
min_interval: float = 5.0,
) -> None:
"""Initialize throttled handler.
Args:
base_handler: Handler to forward non-throttled messages to.
min_interval: Minimum seconds between progress updates.
"""
super().__init__()
self.base_handler = base_handler
self.min_interval = min_interval
self._last_progress_time: float = 0
self._last_percentage: int = -1
self._lock = Lock()
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record, throttling progress messages.
Args:
record: Log record to potentially emit.
"""
# Check if this is a progress message
is_progress = getattr(record, "is_progress", False)
if not is_progress:
# Non-progress messages pass through immediately
self.base_handler.emit(record)
return
# Throttle progress messages
with self._lock:
current_time = time.time()
percentage = getattr(record, "percentage", None)
is_best = getattr(record, "is_new_best", False)
should_emit = False
# Always emit if it's a new best result
if is_best:
should_emit = True
# Always emit milestones
elif percentage is not None and int(percentage) in self.MILESTONES:
if int(percentage) != self._last_percentage:
should_emit = True
self._last_percentage = int(percentage)
# Time-based throttle
elif current_time - self._last_progress_time >= self.min_interval:
should_emit = True
if should_emit:
self._last_progress_time = current_time
self.base_handler.emit(record)
[docs]
def reset(self) -> None:
"""Reset throttle state for a new operation."""
with self._lock:
self._last_progress_time = 0
self._last_percentage = -1
[docs]
def close(self) -> None:
"""Close this handler and the base handler."""
try:
if self.base_handler is not None:
self.base_handler.close()
except Exception:
pass
super().close()
[docs]
class RotatingRunFileHandler(logging.Handler):
"""Handler that writes logs to run-specific files with rotation.
Creates a new log file for each run, with optional rotation to
limit total log storage. Supports both count-based and age-based
rotation policies.
Features:
- Separate log files per run
- Count-based rotation (max_runs)
- Age-based rotation (max_age_days)
- Size-based rotation (max_bytes)
- Optional gzip compression for rotated logs
- Optional JSON Lines output
"""
def __init__(
self,
log_dir: Path,
run_id: str,
max_runs: int = 100,
max_age_days: Optional[int] = 30,
max_bytes: Optional[int] = None,
compress_rotated: bool = True,
json_output: bool = False,
) -> None:
"""Initialize rotating file handler.
Args:
log_dir: Directory for log files.
run_id: Unique run identifier.
max_runs: Maximum number of run logs to keep.
max_age_days: Maximum age of logs in days (None to disable).
max_bytes: Maximum size of a single log file before rotation (None to disable).
compress_rotated: Whether to gzip old log files.
json_output: If True, also write JSON Lines file.
"""
super().__init__()
self.log_dir = Path(log_dir)
self.run_id = run_id
self.max_runs = max_runs
self.max_age_days = max_age_days
self.max_bytes = max_bytes
self.compress_rotated = compress_rotated
self.json_output = json_output
self._lock = Lock()
self._rotation_counter = 0
# Create log directory
self.log_dir.mkdir(parents=True, exist_ok=True)
# Create file handlers
self._log_file = self.log_dir / f"{run_id}.log"
self._log_handle = open(self._log_file, "w", encoding="utf-8")
self._current_size = 0
if json_output:
self._json_file = self.log_dir / f"{run_id}.jsonl"
self._json_handle = open(self._json_file, "w", encoding="utf-8")
else:
self._json_handle = None
self._json_file = None
# Rotate old logs
self._rotate_logs()
def _rotate_logs(self) -> None:
"""Remove oldest log files if over limit.
Applies both count-based and age-based rotation policies.
"""
now = datetime.now()
# Get all log files
log_files = list(self.log_dir.glob("*.log"))
log_files.extend(self.log_dir.glob("*.log.gz"))
# Sort by modification time (oldest first)
log_files = sorted(
log_files,
key=lambda p: p.stat().st_mtime,
)
files_to_remove: list[Path] = []
# Age-based rotation
if self.max_age_days is not None:
cutoff = now - timedelta(days=self.max_age_days)
for log_file in log_files:
try:
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff:
files_to_remove.append(log_file)
except OSError:
pass
# Count-based rotation (excluding current file)
current_files = [f for f in log_files if f not in files_to_remove]
if len(current_files) >= self.max_runs:
# Remove oldest files to get under limit
excess = len(current_files) - self.max_runs + 1 # +1 for current
files_to_remove.extend(current_files[:excess])
# Remove files and their JSON companions
for log_file in set(files_to_remove):
try:
log_file.unlink()
# Also remove corresponding JSON file if exists
json_file = log_file.with_suffix(".jsonl")
if json_file.exists():
json_file.unlink()
json_gz = log_file.with_suffix(".jsonl.gz")
if json_gz.exists():
json_gz.unlink()
except OSError:
pass
def _rotate_current_file(self) -> None:
"""Rotate the current log file when size limit is reached."""
if self._log_handle is None:
return
with self._lock:
# Close current file
self._log_handle.close()
# Rename with rotation counter
self._rotation_counter += 1
rotated_name = self._log_file.with_suffix(f".{self._rotation_counter}.log")
self._log_file.rename(rotated_name)
# Compress if enabled
if self.compress_rotated:
self._compress_file(rotated_name)
# Open new file
self._log_handle = open(self._log_file, "w", encoding="utf-8")
self._current_size = 0
def _compress_file(self, file_path: Path) -> None:
"""Compress a file with gzip.
Args:
file_path: Path to file to compress.
"""
try:
gz_path = file_path.with_suffix(file_path.suffix + ".gz")
with open(file_path, "rb") as f_in:
with gzip.open(gz_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
file_path.unlink()
except Exception:
pass # Keep uncompressed if compression fails
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Write log record to file(s).
Args:
record: Log record to write.
"""
try:
# Format for human-readable log
msg = self.format(record)
with self._lock:
# Check size limit before writing
msg_bytes = len(msg.encode("utf-8")) + 1 # +1 for newline
if self.max_bytes and self._current_size + msg_bytes > self.max_bytes:
self._rotate_current_file()
self._log_handle.write(msg + "\n")
self._log_handle.flush()
self._current_size += msg_bytes
# Format for JSON log if enabled
if self._json_handle is not None:
from .formatters import JsonFormatter
json_formatter = JsonFormatter(run_id=self.run_id)
json_msg = json_formatter.format(record)
self._json_handle.write(json_msg + "\n")
self._json_handle.flush()
except Exception:
self.handleError(record)
[docs]
def close(self) -> None:
"""Close file handles."""
with self._lock:
try:
if self._log_handle is not None:
self._log_handle.close()
self._log_handle = None
if self._json_handle is not None:
self._json_handle.close()
self._json_handle = None
except Exception:
pass
super().close()
[docs]
def get_log_file_path(self) -> Path:
"""Get the path to the current log file.
Returns:
Path to the current log file.
"""
return self._log_file
[docs]
def get_json_file_path(self) -> Optional[Path]:
"""Get the path to the current JSON log file.
Returns:
Path to JSON log file, or None if not enabled.
"""
return self._json_file
[docs]
class NullHandler(logging.Handler):
"""Handler that discards all log records.
Used when logging should be completely silent.
"""
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Discard the log record.
Args:
record: Log record (ignored).
"""
pass
[docs]
class BufferedHandler(logging.Handler):
"""Handler that buffers log records for batch processing.
Useful for collecting logs during a phase and outputting them
together, e.g., for branch comparison summaries.
"""
def __init__(self, max_size: int = 1000) -> None:
"""Initialize buffered handler.
Args:
max_size: Maximum number of records to buffer.
"""
super().__init__()
self.max_size = max_size
self._buffer: list[logging.LogRecord] = []
self._lock = Lock()
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Buffer the log record.
Args:
record: Log record to buffer.
"""
with self._lock:
if len(self._buffer) < self.max_size:
self._buffer.append(record)
[docs]
def flush_to(self, handler: logging.Handler) -> None:
"""Flush buffered records to another handler.
Args:
handler: Handler to send buffered records to.
"""
with self._lock:
for record in self._buffer:
handler.emit(record)
self._buffer.clear()
[docs]
def get_records(self) -> list[logging.LogRecord]:
"""Get buffered records.
Returns:
List of buffered log records.
"""
with self._lock:
return list(self._buffer)
[docs]
def clear(self) -> None:
"""Clear the buffer."""
with self._lock:
self._buffer.clear()