import numpy as np
import plotly.graph_objects as go
try:
from ripser import ripser
except ImportError as exc:
raise ImportError("ripser.py is required for TDA analysis. Install with: pip install ripser") from exc
# TDA with ripser.py on Takens embedding of daily commits
embed_dim = 3
if len(observed_daily) >= embed_dim:
point_cloud = np.array(
[observed_daily[i:i + embed_dim] for i in range(len(observed_daily) - embed_dim + 1)],
dtype=float,
)
else:
point_cloud = observed_daily.reshape(-1, 1)
dgms = ripser(point_cloud, maxdim=1)["dgms"]
h0 = dgms[0]
h1 = dgms[1] if len(dgms) > 1 else np.empty((0, 2))
def finite_lifetimes(diagram: np.ndarray) -> np.ndarray:
if diagram.size == 0:
return np.array([], dtype=float)
births = diagram[:, 0]
deaths = diagram[:, 1]
mask = np.isfinite(deaths)
if not np.any(mask):
return np.array([], dtype=float)
return np.maximum(deaths[mask] - births[mask], 0.0)
lifetimes = np.concatenate([finite_lifetimes(h0), finite_lifetimes(h1)])
if lifetimes.size == 0:
lifetimes = np.array([1.0], dtype=float)
tda_scale = float(np.median(lifetimes))
tda_scale = max(tda_scale, 1e-6)
# Fit a local state-space model on the embedded points for extrapolation
if len(observed_daily) >= embed_dim + 1:
X_train = np.array(
[observed_daily[i:i + embed_dim] for i in range(len(observed_daily) - embed_dim)],
dtype=float,
)
y_train = np.array(
[observed_daily[i + embed_dim] for i in range(len(observed_daily) - embed_dim)],
dtype=float,
)
simulated_daily = observed_daily.tolist()
future_days = len(all_dates) - len(observed_dates)
for _ in range(future_days):
state = np.array(simulated_daily[-embed_dim:], dtype=float)
distances = np.linalg.norm(X_train - state, axis=1)
weights = np.exp(-distances / tda_scale)
weight_sum = float(weights.sum())
next_value = (
float(np.dot(weights, y_train) / weight_sum)
if weight_sum > 0
else float(y_train.mean())
)
simulated_daily.append(max(next_value, 0.0))
tda_cumulative = np.cumsum(np.array(simulated_daily, dtype=float))
else:
tda_cumulative = linear_cumulative.copy()
expected_tda = float(tda_cumulative[-1])
print(f"Expected cumulative commits by {forecast_end_date.isoformat()} (tda/ripser-model): {expected_tda:.1f}")
# Plot 2a: TDA (ripser.py) model forecast
tda_fig = go.Figure()
tda_fig.add_trace(
go.Scatter(
x=observed_dates,
y=observed_cumulative,
mode="lines+markers",
name="Observed cumulative",
marker=dict(size=5),
line=dict(width=2),
)
)
tda_fig.add_trace(
go.Scatter(
x=all_dates,
y=tda_cumulative,
mode="lines",
name="TDA (ripser.py) model",
line=dict(width=2, dash="dot"),
)
)
tda_fig.add_vline(
x=observed_end_date,
line_width=1,
line_dash="dash",
line_color="gray",
)
tda_fig.update_layout(
title=f"TDA (ripser.py) extrapolation until {forecast_end_date.isoformat()}",
xaxis_title="Date",
yaxis_title="Cumulative commits since cutoff",
template="plotly_white",
height=460,
)
tda_fig.show()
# Plot 2b: barcode plot (commit cadence topology)
barcode_fig = go.Figure()
def add_barcode(diagram: np.ndarray, y_offset: float, name: str, color: str) -> float:
if diagram.size == 0:
return y_offset
finite = diagram[np.isfinite(diagram[:, 1])]
for i, (birth, death) in enumerate(finite):
y = y_offset + i
barcode_fig.add_trace(
go.Scatter(
x=[birth, death],
y=[y, y],
mode="lines",
line=dict(color=color, width=2),
name=name if i == 0 else None,
showlegend=(i == 0),
hovertemplate=f"{name}<br>birth: %{{x[0]:.4f}}<br>death: %{{x[1]:.4f}}<extra></extra>",
)
)
return y_offset + len(finite) + 2
y0 = 0.0
y0 = add_barcode(h0, y0, "H0 barcode", "#4C78A8")
y0 = add_barcode(h1, y0, "H1 barcode", "#F58518")
if len(barcode_fig.data) == 0:
barcode_fig.add_annotation(
text="No finite persistence intervals available",
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False
)
barcode_fig.update_layout(
title="Ripser barcode plot for commit cadence (Takens embedding)",
xaxis_title="Filtration value",
yaxis_title="Barcode index",
template="plotly_white",
height=420,
)
barcode_fig.show()