Source code for tseda.decomposition.automatic_grouping_heuristic
"""Automatic SSA component grouping heuristics."""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
[docs]
@dataclass(slots=True)
class AutomaticGroupingHeuristic:
"""Suggest SSA grouping labels from the eigenvalue spectrum.
Components explaining at least ``variance_threshold`` of total variance are
classified as either trend or seasonality. Near-equal adjacent pairs are
treated as seasonal pairs; the remaining eligible components are treated as
trend. All other components fall into noise.
"""
eigenvalues: np.ndarray
variance_threshold: float = 0.10
pair_similarity_tolerance: float = 0.05
def __post_init__(self) -> None:
values = np.asarray(self.eigenvalues, dtype=float)
if values.ndim != 1:
raise ValueError("eigenvalues must be a one-dimensional array.")
if np.any(values < 0):
raise ValueError("eigenvalues must be non-negative.")
self.eigenvalues = values
@property
def explained_variance_ratios(self) -> np.ndarray:
"""Return per-component explained variance ratios."""
total_variance = float(np.sum(self.eigenvalues))
if total_variance <= 0:
return np.zeros_like(self.eigenvalues, dtype=float)
return self.eigenvalues / total_variance
[docs]
def is_near_equal_pair(self, left_index: int, right_index: int) -> bool:
"""Return True when two eigenvalues differ by at most the configured tolerance."""
left_value = float(self.eigenvalues[left_index])
right_value = float(self.eigenvalues[right_index])
larger = max(left_value, right_value)
smaller = min(left_value, right_value)
if larger <= 0:
return False
return ((larger - smaller) / larger) <= self.pair_similarity_tolerance
[docs]
def has_seasonal_pair(self, max_components: int | None = None) -> bool:
"""Return True when any adjacent eligible pair satisfies the similarity rule."""
eligible = self.eligible_component_indices()
if max_components is not None:
eligible = [index for index in eligible if index < max_components]
for offset in range(len(eligible) - 1):
left_index = eligible[offset]
right_index = eligible[offset + 1]
if right_index == left_index + 1 and self.is_near_equal_pair(left_index, right_index):
return True
return False
[docs]
def eligible_component_indices(self) -> list[int]:
"""Return component indices meeting the minimum explained-variance threshold."""
ratios = self.explained_variance_ratios
return [index for index, ratio in enumerate(ratios) if float(ratio) >= self.variance_threshold]
[docs]
def suggest_reconstruction(self) -> dict[str, list[int]]:
"""Return a trend/seasonality/noise grouping suggestion."""
eligible = self.eligible_component_indices()
trend_indices: list[int] = []
seasonality_indices: list[int] = []
cursor = 0
while cursor < len(eligible):
left_index = eligible[cursor]
right_index = eligible[cursor + 1] if cursor + 1 < len(eligible) else None
if (
right_index is not None
and right_index == left_index + 1
and self.is_near_equal_pair(left_index, right_index)
):
seasonality_indices.extend([left_index, right_index])
cursor += 2
continue
trend_indices.append(left_index)
cursor += 1
assigned = set(trend_indices) | set(seasonality_indices)
noise_indices = [index for index in range(len(self.eigenvalues)) if index not in assigned]
return {
"Trend": trend_indices,
"Seasonality": seasonality_indices,
"Noise": noise_indices,
}
[docs]
def suggest_next_expansion(
self, current: dict[str, list[int]]
) -> tuple[dict[str, list[int]], bool]:
"""Expand the current assignment by one step from the noise pool.
Takes the lowest-index (highest-eigenvalue) component in the noise pool.
If it and its immediate successor form a near-equal adjacent pair, both are
added to seasonality; otherwise the single component is added to trend.
Returns:
A tuple of the updated assignment dict and True when an expansion was
made, or (current, False) when the noise pool is exhausted.
"""
noise_pool = sorted(current.get("Noise", []))
if not noise_pool:
return current, False
new_assignment = {k: list(v) for k, v in current.items()}
candidate = noise_pool[0]
next_candidate = noise_pool[1] if len(noise_pool) > 1 else None
if (
next_candidate is not None
and next_candidate == candidate + 1
and self.is_near_equal_pair(candidate, next_candidate)
):
new_assignment["Seasonality"] = sorted(
new_assignment.get("Seasonality", []) + [candidate, next_candidate]
)
new_assignment["Noise"] = [
i for i in noise_pool if i not in {candidate, next_candidate}
]
else:
new_assignment["Trend"] = sorted(
new_assignment.get("Trend", []) + [candidate]
)
new_assignment["Noise"] = [i for i in noise_pool if i != candidate]
return new_assignment, True