"""
Cohort-time ATT estimation for staggered difference-in-differences designs.
This module implements treatment effect estimation for staggered adoption
settings where units begin treatment at different time periods. Each
(cohort, period) pair receives its own ATT estimate through cross-sectional
regression on transformed outcome data.
The estimation proceeds in three steps:
1. Transform outcomes by removing pre-treatment unit-specific patterns
(demeaning or detrending) for each treatment cohort.
2. For each treated cohort g in post-treatment period r, restrict the
sample to cohort g units plus valid control units.
3. Run cross-sectional regression of transformed outcomes on a treatment
indicator to estimate the ATT.
Valid control units for cohort g at time r include never-treated units
and units first treated after r. This rolling approach efficiently uses
not-yet-treated observations while maintaining identification under
conditional parallel trends and no anticipation assumptions.
Notes
-----
Three estimation methods are supported:
- Regression adjustment (RA): OLS estimation with optional covariate
adjustment. Enables exact t-based inference under classical linear
model assumptions.
- Inverse probability weighted regression adjustment (IPWRA): Doubly
robust estimation combining propensity score weighting with outcome
regression. Consistent if either model is correctly specified.
- Propensity score matching (PSM): Nearest-neighbor matching on
estimated propensity scores.
The cross-sectional regression for each (g, r) pair has the form:
.. math::
\\hat{Y}_{irg} = \\alpha + \\tau_{gr} D_{ig} + X_i \\beta + u_i
where :math:`\\hat{Y}_{irg}` is the transformed outcome, :math:`D_{ig}`
is the treatment indicator for cohort g, and :math:`\\tau_{gr}` is the
ATT for cohort g in period r.
"""
from __future__ import annotations
import warnings
from dataclasses import dataclass
from typing import Any
import numpy as np
import pandas as pd
import scipy.stats as stats
from .control_groups import (
ControlGroupStrategy,
get_valid_control_units,
)
from .transformations import get_cohorts, get_valid_periods_for_cohort
from .estimators import estimate_ipwra, estimate_psm, IPWRAResult, PSMResult
from ..exceptions import InvalidParameterError
[docs]
@dataclass
class CohortTimeEffect:
"""
Container for a single cohort-time treatment effect estimate.
Stores the ATT for treatment cohort g at calendar time r, along with
inference statistics from cross-sectional regression.
Attributes
----------
cohort : int
Treatment cohort identifier (first treatment period g).
period : int
Calendar time period r.
event_time : int
Event time relative to treatment onset (e = r - g).
att : float
Estimated average treatment effect on the treated.
se : float
Standard error of the ATT estimate.
ci_lower : float
Lower bound of the confidence interval.
ci_upper : float
Upper bound of the confidence interval.
t_stat : float
t-statistic for testing H0: ATT = 0.
pvalue : float
Two-sided p-value from t-distribution.
n_treated : int
Number of treated units in estimation sample.
n_control : int
Number of control units in estimation sample.
n_total : int
Total sample size (n_treated + n_control).
df_resid : int
Residual degrees of freedom. Default is 0.
df_inference : int
Degrees of freedom for inference. Default is 0.
"""
cohort: int
period: int
event_time: int
att: float
se: float
ci_lower: float
ci_upper: float
t_stat: float
pvalue: float
n_treated: int
n_control: int
n_total: int
df_resid: int = 0
df_inference: int = 0
# =============================================================================
# HC Standard Error Helper Functions
# =============================================================================
# Constants for numerical stability
_LEVERAGE_CLIP_MAX = 0.9999
_LEVERAGE_WARN_THRESHOLD = 0.9
def _compute_leverage(
X: np.ndarray,
XtX_inv: np.ndarray,
clip_max: float = _LEVERAGE_CLIP_MAX,
warn_threshold: float = _LEVERAGE_WARN_THRESHOLD,
) -> np.ndarray:
"""
Compute leverage values (hat matrix diagonal elements).
Uses an efficient vectorized method that avoids constructing the full
N×N hat matrix, reducing time complexity from O(N²×K) to O(N×K²).
Parameters
----------
X : np.ndarray
Design matrix of shape (N, K).
XtX_inv : np.ndarray
Inverse of X'X matrix of shape (K, K).
clip_max : float, default=0.9999
Upper bound for leverage value clipping to prevent division by zero.
warn_threshold : float, default=0.9
Threshold above which to warn about extreme leverage values.
Returns
-------
np.ndarray
Leverage values of shape (N,), clipped to [0, clip_max].
Notes
-----
The leverage value h_ii = x_i'(X'X)^{-1}x_i measures how much influence
observation i has on its own fitted value. Properties:
- 0 < h_ii < 1 for all i
- sum(h_ii) = K (number of parameters)
- Average leverage = K/N
- High leverage threshold typically 2K/N or 3K/N
The efficient computation uses:
h_ii = diag(X @ XtX_inv @ X.T) = (X @ XtX_inv * X).sum(axis=1)
"""
# Efficient computation avoiding N×N matrix
# h_ii = diag(H) where H = X @ XtX_inv @ X.T
# This is equivalent to: (X @ XtX_inv * X).sum(axis=1)
tmp = X @ XtX_inv # (N, K)
h_ii = (tmp * X).sum(axis=1) # (N,)
# Check for extreme leverage values
max_h = np.max(h_ii)
if max_h > warn_threshold:
warnings.warn(
f"Extreme leverage detected: max(h_ii) = {max_h:.4f} > {warn_threshold}. "
f"HC standard errors may be numerically unstable. "
f"Consider using HC4 or checking for influential observations.",
UserWarning
)
# Clip to ensure numerical stability (prevent division by zero in HC2-HC4)
h_ii = np.clip(h_ii, 0, clip_max)
return h_ii
def _compute_hc_variance(
X: np.ndarray,
residuals: np.ndarray,
XtX_inv: np.ndarray,
hc_type: str,
) -> np.ndarray:
"""
Compute heteroskedasticity-consistent (HC) variance-covariance matrix.
Implements the sandwich variance estimator for HC0 through HC4:
V̂ = (X'X)^{-1} X' Ω X (X'X)^{-1}
where Ω is a diagonal matrix with elements depending on the HC type.
Parameters
----------
X : np.ndarray
Design matrix of shape (N, K).
residuals : np.ndarray
OLS residuals of shape (N,).
XtX_inv : np.ndarray
Inverse of X'X matrix of shape (K, K).
hc_type : str
HC type: 'hc0', 'hc1', 'hc2', 'hc3', 'hc4', or 'robust'.
Returns
-------
np.ndarray
Variance-covariance matrix of shape (K, K).
Raises
------
ValueError
If hc_type is not a valid HC type.
Notes
-----
HC formulas (Ω_ii diagonal elements):
- HC0: e_i² (no adjustment)
- HC1: (n/(n-k)) × e_i² (degrees-of-freedom correction)
- HC2: e_i² / (1 - h_ii) (leverage adjustment)
- HC3: e_i² / (1 - h_ii)² (small-sample adjustment)
- HC4: e_i² / (1 - h_ii)^δ_i where δ_i = min(4, n·h_ii/k) (adaptive)
Typical ordering: SE(HC0) ≤ SE(HC1) and SE(HC2) ≤ SE(HC3).
"""
n, k = X.shape
e2 = residuals ** 2
# Case-insensitive matching for user convenience
hc_lower = hc_type.lower() if hc_type else None
if hc_lower == 'hc0':
# HC0: Basic heteroskedasticity-robust estimator without adjustment.
# Ω_ii = e_i²
omega_diag = e2
elif hc_lower in ('hc1', 'robust'):
# HC1: Degrees-of-freedom adjustment for finite samples.
# Ω_ii = (n/(n-k)) × e_i²
correction = n / (n - k)
omega_diag = correction * e2
elif hc_lower == 'hc2':
# HC2: Leverage-adjusted estimator using hat matrix diagonal.
# Ω_ii = e_i² / (1 - h_ii)
h_ii = _compute_leverage(X, XtX_inv)
omega_diag = e2 / (1 - h_ii)
elif hc_lower == 'hc3':
# HC3: Small-sample adjustment using squared leverage correction.
# Ω_ii = e_i² / (1 - h_ii)²
# Recommended for small samples with few treated or control units.
h_ii = _compute_leverage(X, XtX_inv)
omega_diag = e2 / ((1 - h_ii) ** 2)
elif hc_lower == 'hc4':
# HC4: Adaptive leverage adjustment for high-influence observations.
# Ω_ii = e_i² / (1 - h_ii)^δ_i where δ_i = min(4, n·h_ii/k)
# Provides stronger adjustment for observations with extreme leverage.
h_ii = _compute_leverage(X, XtX_inv)
delta = np.minimum(4.0, n * h_ii / k)
# Guard against numerical issues from negative leverage estimates
delta = np.maximum(delta, 0.0)
omega_diag = e2 / ((1 - h_ii) ** delta)
else:
valid_types = "'hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust'"
raise ValueError(
f"Unknown HC type: '{hc_type}'. Valid options: {valid_types}"
)
# Sandwich variance: V̂ = (X'X)^{-1} X' Ω X (X'X)^{-1}
# Using efficient computation: meat = X.T @ diag(omega) @ X
meat = X.T @ np.diag(omega_diag) @ X
var_beta = XtX_inv @ meat @ XtX_inv
return var_beta
def _compute_hc1_variance(
X: np.ndarray,
residuals: np.ndarray,
XtX_inv: np.ndarray,
) -> np.ndarray:
"""
Compute HC1 variance-covariance matrix.
This is a convenience wrapper for backward compatibility with existing
test code that imports this function directly.
Parameters
----------
X : np.ndarray
Design matrix of shape (N, K).
residuals : np.ndarray
OLS residuals of shape (N,).
XtX_inv : np.ndarray
Inverse of X'X matrix of shape (K, K).
Returns
-------
np.ndarray
HC1 variance-covariance matrix of shape (K, K).
Raises
------
ValueError
If n <= k (insufficient degrees of freedom).
See Also
--------
_compute_hc_variance : General HC variance computation.
"""
n, k = X.shape
if n <= k:
raise ValueError(
f"HC1 variance requires n > k. Got n={n}, k={k}."
)
return _compute_hc_variance(X, residuals, XtX_inv, 'hc1')
[docs]
def run_ols_regression(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str] | None = None,
vce: str | None = None,
cluster_var: str | None = None,
alpha: float = 0.05,
) -> dict[str, Any]:
"""
Estimate the ATT via OLS regression on cross-sectional data.
Regresses the transformed outcome on a constant and treatment indicator,
optionally including control variables. When controls are included and
sample sizes permit, interactions with the treatment indicator (centered
at treated-group mean) are added to allow heterogeneous covariate effects.
Parameters
----------
data : pd.DataFrame
Cross-sectional data with one row per unit.
y : str
Name of the dependent variable column (transformed outcome).
d : str
Name of the treatment indicator column.
controls : list of str, optional
Names of control variable columns.
vce : {None, 'hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust', 'cluster'}, optional
Variance estimation method (case-insensitive):
- None: Homoskedastic OLS standard errors. Enables exact t-based
inference under normality assumption.
- 'hc0': Basic heteroskedasticity-robust. No finite-sample adjustment.
- 'hc1' or 'robust': HC1 with degrees-of-freedom correction n/(n-k).
- 'hc2': Leverage-adjusted using (1 - h_ii)^{-1}.
- 'hc3': Small-sample adjusted using (1 - h_ii)^{-2}. Recommended
for small samples with few treated or control units.
- 'hc4': Adaptive leverage correction using δ_i = min(4, n·h_ii/k).
Recommended when extreme leverage exists.
- 'cluster': Cluster-robust. Requires ``cluster_var``.
cluster_var : str, optional
Cluster variable column. Required when vce='cluster'.
alpha : float, default=0.05
Significance level for confidence interval construction.
Returns
-------
dict
Estimation results with keys: 'att', 'se', 'ci_lower', 'ci_upper',
't_stat', 'pvalue', 'nobs', 'df_resid', 'df_inference'.
Raises
------
ValueError
If required columns are missing, sample size is insufficient,
or the design matrix is singular.
See Also
--------
estimate_cohort_time_effects : Estimate ATT for all cohort-period pairs.
_compute_hc_variance : HC variance computation details.
Notes
-----
HC standard error ordering (typical):
- SE(HC0) ≤ SE(HC1) due to degrees-of-freedom correction
- SE(HC2) ≤ SE(HC3) due to stronger leverage adjustment
- SE(HC4) adapts based on leverage; may exceed HC3 for high-leverage obs
"""
if y not in data.columns:
raise ValueError(f"Dependent variable '{y}' not in data")
if d not in data.columns:
raise ValueError(f"Treatment indicator '{d}' not in data")
valid_mask = data[y].notna() & data[d].notna()
data_clean = data[valid_mask].copy()
n = len(data_clean)
if n < 2:
raise ValueError(f"Insufficient observations: n={n}, need at least 2")
y_vals = data_clean[y].values.astype(float)
d_vals = data_clean[d].values.astype(float)
if controls is not None and len(controls) > 0:
missing_controls = [c for c in controls if c not in data_clean.columns]
if missing_controls:
raise ValueError(f"Control variables not found: {missing_controls}")
# Complete cases required for covariate adjustment to avoid bias
control_valid = data_clean[controls].notna().all(axis=1)
data_clean = data_clean[control_valid].copy()
y_vals = data_clean[y].values.astype(float)
d_vals = data_clean[d].values.astype(float)
n = len(data_clean)
if n < 2:
raise ValueError(f"Insufficient observations after dropping missing controls: n={n}")
treated_mask = d_vals == 1
n_treated = treated_mask.sum()
n_control = (~treated_mask).sum()
K = len(controls)
if n_treated > K + 1 and n_control > K + 1:
# Centering covariates at treated-group mean ensures the coefficient
# on D directly estimates ATT; interaction terms allow heterogeneous
# slopes across treatment and control groups.
X_controls = data_clean[controls].values.astype(float)
X_mean_treated = X_controls[treated_mask].mean(axis=0)
X_centered = X_controls - X_mean_treated
X_interactions = d_vals.reshape(-1, 1) * X_centered
X = np.column_stack([
np.ones(n),
d_vals,
X_controls,
X_interactions
])
else:
# Insufficient sample size for covariate adjustment
warnings.warn(
f"Controls not included: N_treated={n_treated}, N_control={n_control}, K={K}. "
f"Need N_treated > K+1 and N_control > K+1.",
UserWarning
)
X = np.column_stack([np.ones(n), d_vals])
else:
# Without covariates, simple difference-in-means identifies the ATT
X = np.column_stack([np.ones(n), d_vals])
try:
XtX_inv = np.linalg.inv(X.T @ X)
except np.linalg.LinAlgError:
raise ValueError("Design matrix is singular, cannot estimate")
beta = XtX_inv @ (X.T @ y_vals)
y_hat = X @ beta
residuals = y_vals - y_hat
df_resid = n - X.shape[1]
if df_resid <= 0:
att = beta[1]
warnings.warn(
f"No degrees of freedom (n={n}, k={X.shape[1]}). "
f"Point estimate is valid but SE, CI, and p-value are unavailable.",
UserWarning
)
return {
'att': att,
'se': np.nan,
'ci_lower': np.nan,
'ci_upper': np.nan,
't_stat': np.nan,
'pvalue': np.nan,
'nobs': n,
'df_resid': df_resid,
'df_inference': 0,
}
sigma2 = (residuals ** 2).sum() / df_resid
# Case-insensitive matching for user convenience
vce_lower = vce.lower() if vce else None
if vce_lower is None:
# Homoskedastic variance enables exact t-based inference
var_beta = sigma2 * XtX_inv
df_inference = df_resid
elif vce_lower in ('hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust'):
# Heteroskedasticity-consistent standard errors
var_beta = _compute_hc_variance(X, residuals, XtX_inv, vce_lower)
df_inference = df_resid
elif vce_lower == 'cluster':
if cluster_var is None:
raise InvalidParameterError(
"vce='cluster' requires cluster_var parameter"
)
if cluster_var not in data_clean.columns:
raise ValueError(f"Cluster variable '{cluster_var}' not in data")
cluster_ids = data_clean[cluster_var].values
unique_clusters = np.unique(cluster_ids)
G = len(unique_clusters)
if G < 2:
raise ValueError(f"Need at least 2 clusters, got {G}")
if G < 20:
warnings.warn(
f"Few clusters (G={G} < 20). Over-rejection of the null "
f"hypothesis is likely with cluster-robust standard errors. "
f"Consider wild cluster bootstrap or bias-corrected methods. "
f"See Cameron, Gelbach & Miller (2008) and Cameron & Miller (2015).",
UserWarning,
stacklevel=2,
)
# Sum of cluster-level outer products accounts for within-cluster
# correlation of errors; this is the "meat" of the sandwich estimator.
meat = np.zeros((X.shape[1], X.shape[1]))
for cluster in unique_clusters:
mask = cluster_ids == cluster
X_g = X[mask]
e_g = residuals[mask]
score_g = X_g.T @ e_g
meat += np.outer(score_g, score_g)
# Finite-sample correction reduces bias with few clusters (G < 50)
correction = (G / (G - 1)) * ((n - 1) / (n - X.shape[1]))
var_beta = correction * XtX_inv @ meat @ XtX_inv
df_inference = G - 1
else:
valid_options = "None, 'hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust', 'cluster'"
raise ValueError(
f"Unknown vce type: '{vce}'. Valid options: {valid_options}"
)
# Treatment coefficient is in position 1 regardless of whether controls included
att = beta[1]
se = np.sqrt(var_beta[1, 1])
if se > 0:
t_stat = att / se
pvalue = 2 * (1 - stats.t.cdf(abs(t_stat), df_inference))
else:
t_stat = np.nan
pvalue = np.nan
t_crit = stats.t.ppf(1 - alpha / 2, df_inference)
ci_lower = att - t_crit * se
ci_upper = att + t_crit * se
return {
'att': att,
'se': se,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
't_stat': t_stat,
'pvalue': pvalue,
'nobs': n,
'df_resid': df_resid,
'df_inference': df_inference,
}
[docs]
def estimate_cohort_time_effects(
data_transformed: pd.DataFrame,
gvar: str,
ivar: str,
tvar: str,
controls: list[str] | None = None,
vce: str | None = None,
cluster_var: str | None = None,
control_strategy: str = 'not_yet_treated',
never_treated_values: list | None = None,
min_obs: int = 3,
min_treated: int = 1,
min_control: int = 1,
alpha: float = 0.05,
estimator: str = 'ra',
transform_type: str = 'demean',
propensity_controls: list[str] | None = None,
trim_threshold: float = 0.01,
se_method: str = 'analytical',
n_neighbors: int = 1,
with_replacement: bool = True,
caliper: float | None = None,
return_diagnostics: bool = False,
match_order: str | None = None,
) -> list[CohortTimeEffect]:
"""
Estimate treatment effects for all valid cohort-period pairs.
Iterates over all treatment cohorts and their post-treatment periods,
estimating the ATT for each (cohort, period) combination using the
specified estimation method. For each pair, the estimation sample
consists of the treatment cohort units plus valid control units
determined by the control group strategy.
Parameters
----------
data_transformed : pd.DataFrame
Panel data containing transformed outcome columns generated by
``transform_staggered_demean`` or ``transform_staggered_detrend``.
Must include columns for gvar, ivar, tvar, and transformed
outcomes named 'ydot_g{g}_r{r}' or 'ycheck_g{g}_r{r}'.
gvar : str
Name of the cohort variable column indicating first treatment period.
ivar : str
Name of the unit identifier column.
tvar : str
Name of the time variable column.
controls : list of str, optional
Names of time-invariant control variable columns.
vce : str, optional
Variance estimation type: None (homoskedastic), 'hc3'
(heteroskedasticity-robust), or 'cluster' (cluster-robust).
cluster_var : str, optional
Name of the cluster variable column. Required when vce='cluster'.
control_strategy : str, default='not_yet_treated'
Control group selection strategy: 'never_treated' uses only
never-treated units; 'not_yet_treated' includes units first
treated after the current period; 'all_others' uses all units
not in the treatment cohort (including already-treated units);
'auto' selects based on data availability.
never_treated_values : list, optional
Values in gvar indicating never-treated units. Defaults to
[0, np.inf] and NaN values.
min_obs : int, default=3
Minimum total sample size required for estimation.
min_treated : int, default=1
Minimum number of treated units required.
min_control : int, default=1
Minimum number of control units required.
alpha : float, default=0.05
Significance level for confidence interval construction.
estimator : str, default='ra'
Estimation method: 'ra' (regression adjustment), 'ipwra'
(inverse probability weighted regression adjustment), or
'psm' (propensity score matching).
transform_type : str, default='demean'
Transformation type applied to the data: 'demean' or 'detrend'.
Determines the column prefix for transformed outcomes.
propensity_controls : list of str, optional
Control variables for the propensity score model. If None,
uses the same variables as ``controls``.
trim_threshold : float, default=0.01
Propensity score trimming threshold for IPWRA and PSM.
Observations with extreme propensity scores are excluded.
se_method : str, default='analytical'
Standard error method for IPWRA: 'analytical' or 'bootstrap'.
n_neighbors : int, default=1
Number of nearest neighbors for PSM matching.
with_replacement : bool, default=True
Whether PSM matching allows replacement.
caliper : float, optional
Maximum propensity score distance for PSM matching.
return_diagnostics : bool, default=False
Reserved for future use. Currently has no effect.
match_order : str, optional
Reserved for future use. Currently has no effect.
Returns
-------
list of CohortTimeEffect
Estimation results for all valid (cohort, period) pairs,
sorted by cohort and period.
Raises
------
ValueError
If required columns are missing, no valid treatment cohorts
exist, or parameter values are invalid.
See Also
--------
transform_staggered_demean : Demeaning transformation for staggered designs.
transform_staggered_detrend : Detrending transformation for staggered designs.
aggregate_to_cohort : Aggregate cohort-time effects to cohort-level effects.
aggregate_to_overall : Aggregate effects to a single overall estimate.
Notes
-----
The estimation proceeds separately for each (cohort, period) pair. For
treatment cohort g in calendar time r, the control group consists of
never-treated units and units first treated after period r. This rolling
selection ensures that control units satisfy no anticipation at time r.
Under the 'not_yet_treated' strategy, the control pool varies by period:
earlier periods have more controls available since fewer cohorts have
been treated. The 'never_treated' strategy uses a fixed control pool
across all periods, which may be more restrictive but avoids potential
confounding from units that eventually receive treatment.
Sample size requirements (min_obs, min_treated, min_control) are checked
before estimation. Pairs that fail these checks are silently skipped
and reported in the warning message.
"""
# =========================================================================
# Input Validation
# =========================================================================
required_cols = [gvar, ivar, tvar]
missing = [c for c in required_cols if c not in data_transformed.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
if vce == 'cluster' and cluster_var is None:
raise ValueError("cluster_var required when vce='cluster'")
strategy_map = {
'never_treated': ControlGroupStrategy.NEVER_TREATED,
'not_yet_treated': ControlGroupStrategy.NOT_YET_TREATED,
'all_others': ControlGroupStrategy.ALL_OTHERS,
'auto': ControlGroupStrategy.AUTO,
}
if control_strategy not in strategy_map:
raise ValueError(
f"Invalid control_strategy: {control_strategy}. "
f"Must be one of: {list(strategy_map.keys())}"
)
strategy = strategy_map[control_strategy]
# =========================================================================
# Cohort and Period Extraction
# =========================================================================
if never_treated_values is None:
nt_values = [0, np.inf]
else:
nt_values = never_treated_values
cohorts = get_cohorts(data_transformed, gvar, ivar, nt_values)
if len(cohorts) == 0:
raise ValueError("No valid treatment cohorts found in data.")
T_max = int(data_transformed[tvar].max())
# =========================================================================
# Cohort-Time Effect Estimation
# =========================================================================
results = []
skipped_pairs = []
# Column prefix reflects transformation type: 'ydot' for demeaned, 'ycheck' for detrended
prefix = 'ydot' if transform_type == 'demean' else 'ycheck'
def _safe_int(value: Any) -> int:
"""Coerce possibly-missing numeric values to a safe int (NaN/None -> 0)."""
if value is None:
return 0
try:
value_f = float(value)
except (TypeError, ValueError):
return 0
if not np.isfinite(value_f):
return 0
return int(value_f)
for g in cohorts:
valid_periods = get_valid_periods_for_cohort(g, T_max)
for r in valid_periods:
ydot_col = f'{prefix}_g{g}_r{r}'
if ydot_col not in data_transformed.columns:
skipped_pairs.append((g, r, 'missing_transform_column'))
continue
# -------------------------------------------------------------------------
# Extract Period Cross-Section
# -------------------------------------------------------------------------
period_data = data_transformed[data_transformed[tvar] == r].copy()
if len(period_data) == 0:
skipped_pairs.append((g, r, 'no_data_in_period'))
continue
# -------------------------------------------------------------------------
# Identify Valid Control Units
# -------------------------------------------------------------------------
try:
unit_control_mask = get_valid_control_units(
period_data, gvar, ivar,
cohort=g, period=r,
strategy=strategy,
never_treated_values=nt_values,
)
except Exception as e:
skipped_pairs.append((g, r, f'control_mask_error: {e}'))
continue
# Map unit-level control status to observations; missing units default
# to non-control to ensure conservative sample construction.
control_mask = period_data[ivar].map(unit_control_mask).fillna(False).astype(bool)
# -------------------------------------------------------------------------
# Construct Estimation Sample
# -------------------------------------------------------------------------
treat_mask = (period_data[gvar] == g)
sample_mask = treat_mask | control_mask
n_treat = treat_mask.sum()
n_control = control_mask.sum()
n_total = sample_mask.sum()
if n_total < min_obs:
skipped_pairs.append((g, r, f'insufficient_total: {n_total}<{min_obs}'))
continue
if n_treat < min_treated:
skipped_pairs.append((g, r, f'insufficient_treated: {n_treat}<{min_treated}'))
continue
if n_control < min_control:
skipped_pairs.append((g, r, f'insufficient_control: {n_control}<{min_control}'))
continue
sample_data = period_data[sample_mask].copy()
# Binary indicator needed for OLS: 1 for cohort g, 0 for controls
sample_data['_D_treat'] = (sample_data[gvar] == g).astype(int)
# -------------------------------------------------------------------------
# Run Estimator
# -------------------------------------------------------------------------
try:
if estimator == 'ra':
est_result = run_ols_regression(
data=sample_data,
y=ydot_col,
d='_D_treat',
controls=controls,
vce=vce,
cluster_var=cluster_var,
alpha=alpha,
)
elif estimator == 'ipwra':
est_result = _estimate_single_effect_ipwra(
data=sample_data,
y=ydot_col,
d='_D_treat',
controls=controls or [],
propensity_controls=propensity_controls or controls or [],
trim_threshold=trim_threshold,
se_method=se_method,
alpha=alpha,
)
elif estimator == 'psm':
est_result = _estimate_single_effect_psm(
data=sample_data,
y=ydot_col,
d='_D_treat',
propensity_controls=propensity_controls or controls or [],
n_neighbors=n_neighbors,
with_replacement=with_replacement,
caliper=caliper,
trim_threshold=trim_threshold,
alpha=alpha,
)
else:
raise ValueError(f"Unknown estimator: {estimator}")
except Exception as e:
skipped_pairs.append((g, r, f'{estimator}_error: {e}'))
continue
results.append(CohortTimeEffect(
cohort=int(g),
period=int(r),
event_time=int(r - g),
att=est_result['att'],
se=est_result['se'],
ci_lower=est_result['ci_lower'],
ci_upper=est_result['ci_upper'],
t_stat=est_result['t_stat'],
pvalue=est_result['pvalue'],
n_treated=int(n_treat),
n_control=int(n_control),
n_total=int(n_total),
df_resid=_safe_int(est_result.get('df_resid', 0)),
df_inference=_safe_int(est_result.get('df_inference', 0)),
))
# =========================================================================
# Reporting
# =========================================================================
if skipped_pairs:
n_skipped = len(skipped_pairs)
n_total_pairs = sum(len(get_valid_periods_for_cohort(g, T_max)) for g in cohorts)
warnings.warn(
f"Skipped {n_skipped}/{n_total_pairs} (cohort, period) pairs due to "
f"insufficient data or errors. Use verbose=True for details.",
UserWarning
)
results.sort(key=lambda x: (x.cohort, x.period))
return results
[docs]
def results_to_dataframe(results: list[CohortTimeEffect]) -> pd.DataFrame:
"""
Convert a list of CohortTimeEffect objects to a pandas DataFrame.
Parameters
----------
results : list of CohortTimeEffect
Estimation results from ``estimate_cohort_time_effects``.
Returns
-------
pd.DataFrame
DataFrame with columns: cohort, period, event_time, att, se,
ci_lower, ci_upper, t_stat, pvalue, n_treated, n_control, n_total.
Returns an empty DataFrame with appropriate columns if the input
list is empty.
See Also
--------
estimate_cohort_time_effects : Estimate ATT for all cohort-period pairs.
CohortTimeEffect : Container class for individual effect estimates.
"""
if len(results) == 0:
return pd.DataFrame(columns=[
'cohort', 'period', 'event_time', 'att', 'se',
'ci_lower', 'ci_upper', 't_stat', 'pvalue',
'n_treated', 'n_control', 'n_total',
'df_resid', 'df_inference'
])
return pd.DataFrame([
{
'cohort': r.cohort,
'period': r.period,
'event_time': r.event_time,
'att': r.att,
'se': r.se,
'ci_lower': r.ci_lower,
'ci_upper': r.ci_upper,
't_stat': r.t_stat,
'pvalue': r.pvalue,
'n_treated': r.n_treated,
'n_control': r.n_control,
'n_total': r.n_total,
'df_resid': r.df_resid,
'df_inference': r.df_inference,
}
for r in results
])
# =============================================================================
# Internal Estimator Wrappers
# =============================================================================
def _estimate_single_effect_ipwra(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str],
propensity_controls: list[str],
trim_threshold: float = 0.01,
se_method: str = 'analytical',
alpha: float = 0.05,
) -> dict[str, Any]:
"""
Estimate ATT using inverse probability weighted regression adjustment.
Internal wrapper that calls the IPWRA estimator and returns results
in a standardized dictionary format consistent with ``run_ols_regression``.
Parameters
----------
data : pd.DataFrame
Cross-sectional sample data with one row per unit.
y : str
Name of the transformed outcome variable column.
d : str
Name of the treatment indicator column.
controls : list of str
Control variables for the outcome regression model.
propensity_controls : list of str
Control variables for the propensity score model.
trim_threshold : float, default=0.01
Threshold for trimming extreme propensity scores.
se_method : str, default='analytical'
Standard error computation method: 'analytical' or 'bootstrap'.
alpha : float, default=0.05
Significance level for confidence interval construction.
Returns
-------
dict
Estimation results with keys: 'att', 'se', 'ci_lower', 'ci_upper',
't_stat', 'pvalue', 'nobs', 'df_resid', 'df_inference', 'estimator'.
Returns NaN values if estimation fails.
"""
try:
result: IPWRAResult = estimate_ipwra(
data=data,
y=y,
d=d,
controls=controls,
propensity_controls=propensity_controls,
trim_threshold=trim_threshold,
se_method=se_method,
alpha=alpha
)
return {
'att': result.att,
'se': result.se,
'ci_lower': result.ci_lower,
'ci_upper': result.ci_upper,
't_stat': result.t_stat,
'pvalue': result.pvalue,
'nobs': result.n_treated + result.n_control,
'df_resid': result.n_treated + result.n_control - len(controls) - 1,
'df_inference': result.n_treated + result.n_control - 2,
'estimator': 'ipwra',
}
except Exception as e:
warnings.warn(
f"IPWRA estimation failed: {str(e)}. Returning NaN.",
UserWarning
)
return {
'att': np.nan,
'se': np.nan,
'ci_lower': np.nan,
'ci_upper': np.nan,
't_stat': np.nan,
'pvalue': np.nan,
'nobs': len(data),
'df_resid': np.nan,
'df_inference': np.nan,
'estimator': 'ipwra',
'error': str(e),
}
def _estimate_single_effect_psm(
data: pd.DataFrame,
y: str,
d: str,
propensity_controls: list[str],
n_neighbors: int = 1,
with_replacement: bool = True,
caliper: float | None = None,
trim_threshold: float = 0.01,
alpha: float = 0.05,
) -> dict[str, Any]:
"""
Estimate ATT using propensity score matching.
Internal wrapper that calls the PSM estimator and returns results
in a standardized dictionary format consistent with ``run_ols_regression``.
Parameters
----------
data : pd.DataFrame
Cross-sectional sample data with one row per unit.
y : str
Name of the transformed outcome variable column.
d : str
Name of the treatment indicator column.
propensity_controls : list of str
Control variables for the propensity score model.
n_neighbors : int, default=1
Number of nearest neighbors for matching each treated unit.
with_replacement : bool, default=True
Whether control units can be matched to multiple treated units.
caliper : float, optional
Maximum propensity score distance for valid matches.
trim_threshold : float, default=0.01
Threshold for trimming extreme propensity scores.
alpha : float, default=0.05
Significance level for confidence interval construction.
Returns
-------
dict
Estimation results with keys: 'att', 'se', 'ci_lower', 'ci_upper',
't_stat', 'pvalue', 'nobs', 'df_resid', 'df_inference', 'estimator',
'n_matched', 'n_dropped'. Returns NaN values if estimation fails.
"""
try:
result: PSMResult = estimate_psm(
data=data,
y=y,
d=d,
propensity_controls=propensity_controls,
n_neighbors=n_neighbors,
with_replacement=with_replacement,
caliper=caliper,
trim_threshold=trim_threshold,
alpha=alpha
)
return {
'att': result.att,
'se': result.se,
'ci_lower': result.ci_lower,
'ci_upper': result.ci_upper,
't_stat': result.t_stat,
'pvalue': result.pvalue,
'nobs': result.n_treated + result.n_matched,
'df_resid': result.n_treated + result.n_matched - 1,
'df_inference': result.n_treated + result.n_matched - 2,
'estimator': 'psm',
'n_matched': result.n_matched,
'n_dropped': result.n_dropped,
}
except Exception as e:
warnings.warn(
f"PSM estimation failed: {str(e)}. Returning NaN.",
UserWarning
)
return {
'att': np.nan,
'se': np.nan,
'ci_lower': np.nan,
'ci_upper': np.nan,
't_stat': np.nan,
'pvalue': np.nan,
'nobs': len(data),
'df_resid': np.nan,
'df_inference': np.nan,
'estimator': 'psm',
'error': str(e),
}