"""
Cross-sectional OLS estimation for difference-in-differences analysis.
This module implements OLS regression on transformed outcome variables for
estimating average treatment effects on the treated (ATT) in common timing
difference-in-differences designs. After unit-specific time-series
transformations remove pre-treatment patterns, the estimation problem
reduces to standard cross-sectional regression.
The module provides three main functions:
- ``prepare_controls`` : Constructs centered controls and interaction terms
for regression adjustment.
- ``estimate_att`` : Estimates the ATT using cross-sectional OLS on the
first post-treatment period.
- ``estimate_period_effects`` : Estimates period-specific ATTs via
independent cross-sectional regressions for each post-treatment period.
Supported variance-covariance estimators include homoskedastic OLS,
heteroskedasticity-robust (HC1, HC3), and cluster-robust standard errors.
Notes
-----
The transformation-based approach converts the panel data DiD problem into
a cross-sectional treatment effects problem. Under no anticipation and
parallel trends assumptions, the ATT is identified as the coefficient on
the treatment indicator in OLS regression of the transformed outcome.
For small samples, homoskedastic standard errors with t-distribution
critical values provide exact inference under normality. HC3 standard
errors offer improved finite-sample performance over HC1 when sample sizes
are moderate but asymptotic theory may not apply.
Cluster-robust inference uses G-1 degrees of freedom (where G is the number
of clusters) rather than the residual degrees of freedom, which provides
more conservative inference when the number of clusters is small.
"""
import warnings
from typing import Any
import numpy as np
import pandas as pd
from scipy import stats
import statsmodels.api as sm
from .exceptions import InvalidParameterError, InvalidVCETypeError, InsufficientDataError
[docs]
def prepare_controls(
data: pd.DataFrame,
d: str,
ivar: str,
controls: list[str],
N_treated: int,
N_control: int,
data_sample: pd.DataFrame | None = None,
) -> dict[str, Any]:
"""
Construct centered controls and interactions for regression adjustment.
Verifies that sample sizes satisfy N_treated > K+1 and N_control > K+1,
where K is the number of control variables. If conditions are met,
computes the mean of control variables for the treated group, centers
the controls by subtracting these means, and creates interaction terms
between the treatment indicator and the centered controls.
Parameters
----------
data : pd.DataFrame
Transformed panel data containing control variables and treatment
indicators.
d : str
Name of the treatment indicator column.
ivar : str
Name of the unit identifier column.
controls : list[str]
List of control variable names. Must be numeric and time-invariant.
N_treated : int
Number of treated units in the regression sample.
N_control : int
Number of control units in the regression sample.
data_sample : pd.DataFrame | None, optional
Specific sample to use for computing means (e.g., first-post-treatment
cross-section). If None, uses the full ``data`` filtered by treatment
status.
Returns
-------
dict[str, Any]
A dictionary containing:
- 'include' : bool
Whether controls can be included in regression.
- 'X_centered' : pd.DataFrame or None
Centered control variables (X - X_mean_treated).
- 'interactions' : pd.DataFrame or None
Interaction terms (D * X_centered).
- 'X_mean_treated' : dict
Means of controls for the treated group.
- 'RHS_varnames' : list of str
Names of all right-hand side control variables.
See Also
--------
estimate_att : Uses prepared controls for ATT estimation.
estimate_period_effects : Uses prepared controls for period-specific ATTs.
Notes
-----
Centering controls at the treated-group mean ensures that the coefficient
on the treatment indicator directly estimates the ATT at the mean covariate
values of the treated group. The regression model is:
.. math::
\\hat{Y}_i = \\alpha + \\tau D_i + X_i \\beta +
D_i (X_i - \\bar{X}_1) \\delta + U_i
where :math:`\\bar{X}_1 = N_1^{-1} \\sum_{i: D_i=1} X_i` is the
treated-group mean of covariates.
The sample size conditions (N_treated > K+1 and N_control > K+1) ensure
that both treated and control subsamples have sufficient degrees of
freedom for separate slope estimation.
"""
K_ctrl = len(controls)
nk = K_ctrl + 1
# Check sample size conditions to ensure sufficient df for separate slopes.
if N_treated > nk and N_control > nk:
# Compute unit-weighted treated-group mean (not row-weighted).
# Using groupby().first() avoids double-counting units with multiple
# time periods in the sample, since X is time-invariant by assumption.
X_mean_treated = {}
for x in controls:
if data_sample is not None:
X_mean_treated[x] = (
data_sample[data_sample[d] == 1]
.groupby(ivar)[x]
.first()
.mean()
)
else:
X_mean_treated[x] = (
data[data[d] == 1]
.groupby(ivar)[x]
.first()
.mean()
)
# Center controls at treated-group mean so the treatment coefficient
# directly estimates ATT evaluated at mean X of treated units.
X_centered = pd.DataFrame({
f'{x}_c': data[x] - X_mean_treated[x]
for x in controls
})
# Interaction terms allow heterogeneous treatment effects across X.
# D * (X - X_bar_1) captures how the treatment effect varies with X.
interactions = pd.DataFrame({
f'd_{x}_c': data[d] * X_centered[f'{x}_c']
for x in controls
})
RHS_varnames = controls + [f'd_{x}_c' for x in controls]
return {
'include': True,
'X_centered': X_centered,
'interactions': interactions,
'X_mean_treated': X_mean_treated,
'RHS_varnames': RHS_varnames
}
else:
warnings.warn(
f"Controls not applied: sample does not satisfy N_1 > K+1 and N_0 > K+1\n"
f" (where K={K_ctrl} is the number of control variables)\n"
f" Found: N_1={N_treated}, K+1={nk}, condition N_1 > K+1: {N_treated > nk}\n"
f" N_0={N_control}, K+1={nk}, condition N_0 > K+1: {N_control > nk}\n"
f" Controls will be ignored in the regression.",
UserWarning,
stacklevel=3
)
return {
'include': False,
'X_centered': None,
'interactions': None,
'X_mean_treated': {},
'RHS_varnames': []
}
[docs]
def estimate_att(
data: pd.DataFrame,
y_transformed: str,
d: str,
ivar: str,
controls: list[str] | None,
vce: str | None,
cluster_var: str | None,
sample_filter: pd.Series,
alpha: float = 0.05,
) -> dict[str, Any]:
"""
Estimate Average Treatment Effect on the Treated (ATT) via cross-sectional OLS.
Performs an OLS regression on the specified cross-section (typically the
first post-treatment period) using the transformed outcome variable.
Supports homoskedastic, heteroskedasticity-robust (HC1, HC3), and
cluster-robust standard errors.
Parameters
----------
data : pd.DataFrame
Transformed panel data containing the dependent variable and
regressors.
y_transformed : str
Name of the transformed dependent variable (e.g., 'ydot_postavg').
d : str
Name of the treatment indicator column (typically ``d_``).
ivar : str
Name of the unit identifier column.
controls : list[str] | None
List of time-invariant control variables. Controls are included only
if sample size conditions (N_1 > K+1 and N_0 > K+1) are met.
vce : str | None
Type of variance-covariance estimator:
- None : Homoskedastic standard errors (OLS).
- 'robust' or 'hc1' : Heteroskedasticity-robust (HC1).
- 'hc3' : Heteroskedasticity-robust (HC3), better for small samples.
- 'cluster' : Cluster-robust. Requires ``cluster_var``.
cluster_var : str | None
Name of the variable to use for clustering. Required if
``vce='cluster'``.
sample_filter : pd.Series
Boolean mask indicating the regression sample (e.g., first
post-treatment period).
alpha : float, default=0.05
Significance level for confidence intervals (0.05 gives 95% CI).
Returns
-------
dict[str, Any]
A dictionary containing estimation results:
- 'att' : float
Estimated ATT.
- 'se_att' : float
Standard error of the ATT.
- 't_stat' : float
t-statistic for H0: ATT = 0.
- 'pvalue' : float
Two-sided p-value.
- 'ci_lower', 'ci_upper' : float
Confidence interval bounds.
- 'params' : pd.Series
All regression coefficients.
- 'bse' : pd.Series
Standard errors of all coefficients.
- 'vcov' : pd.DataFrame
Variance-covariance matrix.
- 'resid' : np.ndarray
Residuals.
- 'nobs' : int
Number of observations.
- 'df_resid' : int
Residual degrees of freedom.
- 'df_inference' : int
Degrees of freedom used for inference.
- 'vce_type' : str
Type of VCE used.
- 'cluster_var' : str or None
Clustering variable if used.
- 'n_clusters' : int or None
Number of clusters if clustered.
- 'controls_used' : bool
Whether controls were included.
- 'controls' : list of str
Controls used in regression.
- 'controls_spec' : dict
Details of control preparation.
- 'n_treated_sample', 'n_control_sample' : int
Sample sizes by treatment status.
Raises
------
InsufficientDataError
If sample size is less than 3, or if no treated or control units
exist in the regression sample.
InvalidParameterError
If cluster_var is missing when vce='cluster', or if cluster variable
is not nested within units.
InvalidVCETypeError
If an unrecognized vce type is specified.
See Also
--------
prepare_controls : Prepares centered controls for regression adjustment.
estimate_period_effects : Estimates period-specific ATTs.
Notes
-----
The ATT is estimated as the coefficient on the treatment indicator D in
the OLS regression:
.. math::
\\hat{Y}_i = \\alpha + \\tau D_i + X_i \\beta +
D_i (X_i - \\bar{X}_1) \\delta + U_i
Confidence intervals use t-distribution critical values:
.. math::
CI = \\hat{\\tau} \\pm t_{df, 1-\\alpha/2} \\cdot SE(\\hat{\\tau})
Degrees of freedom selection:
- For non-clustered standard errors: df = n - k (residual df).
- For cluster-robust standard errors: df = G - 1, where G is the number
of clusters. This provides more conservative inference when the number
of clusters is small.
Under homoskedasticity and normality, the t-statistic has an exact
t-distribution, enabling valid inference even with small samples. HC3
is recommended over HC1 for moderate sample sizes as it provides better
finite-sample performance.
"""
data_sample = data[sample_filter].copy()
if len(data_sample) < 3:
raise InsufficientDataError(
f"Insufficient sample size for regression: N={len(data_sample)} (need >= 3)"
)
N_treated_sample = int(data_sample[d].sum())
N_control_sample = int(len(data_sample) - N_treated_sample)
if N_treated_sample == 0:
raise InsufficientDataError(
f"Firstpost cross-section contains no treated units (N_treated=0, N_control={N_control_sample}). "
f"ATT estimation requires at least one treated unit. "
f"This typically occurs when all treated units exit the panel before the post-treatment period."
)
if N_control_sample == 0:
raise InsufficientDataError(
f"Firstpost cross-section contains no control units (N_treated={N_treated_sample}, N_control=0). "
f"ATT estimation requires at least one control unit for comparison. "
f"This typically occurs when all control units exit the panel before the post-treatment period."
)
controls_spec = None
controls_used = False
if controls is not None and len(controls) > 0:
controls_missing = data_sample[controls].isna().any(axis=1)
n_missing = controls_missing.sum()
if n_missing > 0:
data_sample_clean = data_sample[~controls_missing].copy()
N_treated_clean = int(data_sample_clean[d].sum())
N_control_clean = int(len(data_sample_clean) - N_treated_clean)
K_ctrl = len(controls)
nk = K_ctrl + 1
if N_treated_clean > nk and N_control_clean > nk:
warnings.warn(
f"Dropped {n_missing} observations from the regression sample due to missing control variables.\n"
f" Original sample size: {len(data_sample)}\n"
f" Final sample size: {len(data_sample_clean)}\n"
f" Controls will be included in the regression.",
UserWarning,
stacklevel=3
)
data_sample = data_sample_clean
N_treated = N_treated_clean
N_control = N_control_clean
controls_spec = prepare_controls(data, d, ivar, controls, N_treated, N_control, data_sample)
X_centered_sample = controls_spec['X_centered'].loc[data_sample.index]
interactions_sample = controls_spec['interactions'].loc[data_sample.index]
X_parts = [
data_sample[[d]],
data_sample[controls],
interactions_sample
]
X_df = pd.concat(X_parts, axis=1)
X = sm.add_constant(X_df.astype(float).values)
controls_used = True
else:
warnings.warn(
f"Control variables not included: after dropping {n_missing} observations with missing controls, "
f"the sample would not satisfy N_1 > K+1 and N_0 > K+1.\n"
f" Would have: N_1={N_treated_clean}, N_0={N_control_clean}, K+1={nk}\n"
f" Controls will be ignored, and observations with missing controls will be retained.",
UserWarning,
stacklevel=3
)
d_vals = data_sample[d].astype(float).values
X = sm.add_constant(d_vals)
controls_used = False
else:
N_treated = int(data_sample[d].sum())
N_control = int(len(data_sample) - N_treated)
controls_spec = prepare_controls(data, d, ivar, controls, N_treated, N_control, data_sample)
if controls_spec['include']:
X_centered_sample = controls_spec['X_centered'].loc[data_sample.index]
interactions_sample = controls_spec['interactions'].loc[data_sample.index]
X_parts = [
data_sample[[d]],
data_sample[controls],
interactions_sample
]
X_df = pd.concat(X_parts, axis=1)
X = sm.add_constant(X_df.astype(float).values)
controls_used = True
else:
d_vals = data_sample[d].astype(float).values
X = sm.add_constant(d_vals)
controls_used = False
else:
d_vals = data_sample[d].astype(float).values
X = sm.add_constant(d_vals)
controls_used = False
y_vals = data_sample[y_transformed].values
model = sm.OLS(y_vals, X)
# Normalize vce parameter (case-insensitive)
vce_lower = vce.lower() if vce else None
if vce_lower is None:
results = model.fit()
elif vce_lower == 'hc0':
# HC0: basic heteroskedasticity-robust, no small-sample adjustment
results = model.fit(cov_type='HC0')
elif vce_lower in ('hc1', 'robust'):
# HC1: degrees-of-freedom correction n/(n-k)
n_treated = int((data_sample[d].astype(float).values == 1).sum())
n_control = int((data_sample[d].astype(float).values == 0).sum())
if n_treated < 2 or n_control < 2:
warnings.warn(
f"HC1 (robust) may be unstable with very small samples "
f"(N_treated={n_treated}, N_control={n_control}). "
f"Consider using vce=None for exact inference under normality.",
UserWarning,
stacklevel=3
)
results = model.fit(cov_type='HC1')
elif vce_lower == 'hc2':
# HC2: leverage-adjusted
results = model.fit(cov_type='HC2')
elif vce_lower == 'hc3':
# HC3: small-sample adjusted, suitable for moderate N
n_treated = int((data_sample[d].astype(float).values == 1).sum())
n_control = int((data_sample[d].astype(float).values == 0).sum())
if n_treated < 2 or n_control < 2:
warnings.warn(
f"HC3 may be unstable with very small samples "
f"(N_treated={n_treated}, N_control={n_control}). "
f"Consider using vce=None for exact inference under normality.",
UserWarning,
stacklevel=3
)
results = model.fit(cov_type='HC3')
elif vce_lower == 'hc4':
# HC4: adaptive leverage correction for high-leverage observations.
# statsmodels does not support HC4; compute manually.
results_ols = model.fit()
# Get OLS estimates
n_obs = len(y_vals)
k = X.shape[1]
residuals = results_ols.resid
# Compute leverage values efficiently
# Use pinv (pseudo-inverse) to handle singular/near-singular matrices
try:
XtX_inv = np.linalg.inv(X.T @ X)
except np.linalg.LinAlgError:
# Fall back to pseudo-inverse for singular matrices
XtX_inv = np.linalg.pinv(X.T @ X)
warnings.warn(
"Design matrix is singular or near-singular. "
"Using pseudo-inverse for HC4 variance estimation. "
"Results may be unreliable due to collinearity.",
UserWarning,
stacklevel=3
)
tmp = X @ XtX_inv
h_ii = (tmp * X).sum(axis=1)
h_ii = np.clip(h_ii, 0, 0.9999)
# HC4 adjustment: δ_i = min(4, n·h_ii/k)
delta = np.minimum(4.0, n_obs * h_ii / k)
delta = np.maximum(delta, 0.0)
# Omega diagonal
omega_diag = (residuals ** 2) / ((1 - h_ii) ** delta)
# Sandwich variance
meat = X.T @ np.diag(omega_diag) @ X
var_beta = XtX_inv @ meat @ XtX_inv
# Create a mock results object with HC4 standard errors
class HC4Results:
def __init__(self, ols_results, var_beta):
self.params = ols_results.params
# Handle both Series and ndarray for params
if hasattr(ols_results.params, 'index'):
self.bse = pd.Series(np.sqrt(np.diag(var_beta)), index=ols_results.params.index)
else:
self.bse = np.sqrt(np.diag(var_beta))
self.tvalues = self.params / self.bse
self.pvalues = 2 * (1 - stats.t.cdf(np.abs(self.tvalues), n_obs - k))
self.df_resid = ols_results.df_resid
self.nobs = ols_results.nobs
self.rsquared = ols_results.rsquared
self.rsquared_adj = ols_results.rsquared_adj
self.fvalue = ols_results.fvalue
self.f_pvalue = ols_results.f_pvalue
self.resid = ols_results.resid
self._cov_params = var_beta
def conf_int(self, alpha=0.05):
t_crit = stats.t.ppf(1 - alpha/2, self.df_resid)
ci_lower = self.params - t_crit * self.bse
ci_upper = self.params + t_crit * self.bse
return pd.DataFrame({'lower': ci_lower, 'upper': ci_upper})
def cov_params(self):
return self._cov_params
results = HC4Results(results_ols, var_beta)
elif vce_lower == 'cluster':
if cluster_var is None:
raise InvalidParameterError(
"vce='cluster' requires cluster_var parameter to be specified."
)
if cluster_var not in data_sample.columns:
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' not found in data."
)
n_clusters = data_sample[cluster_var].nunique()
if n_clusters < 2:
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' must have at least 2 unique values. "
f"Found: {n_clusters}"
)
if n_clusters < 10:
warnings.warn(
f"Cluster-robust inference with only {n_clusters} clusters may be unreliable. "
f"Statistical literature typically recommends >= 20-30 clusters for reliable inference. "
f"Consider using wild_cluster_bootstrap() from lwdid.inference for more reliable inference.",
UserWarning,
stacklevel=3
)
elif n_clusters < 20:
# Info-level warning for 10-19 clusters
warnings.warn(
f"Cluster-robust inference with {n_clusters} clusters. "
f"For improved reliability, consider using wild_cluster_bootstrap() from lwdid.inference.",
UserWarning,
stacklevel=3
)
# Check cluster size imbalance (CV > 1.0)
cluster_sizes = data_sample.groupby(cluster_var).size()
if cluster_sizes.mean() > 0:
cluster_size_cv = cluster_sizes.std() / cluster_sizes.mean()
if cluster_size_cv > 1.0:
warnings.warn(
f"Highly unbalanced cluster sizes (CV={cluster_size_cv:.2f}). "
f"Cluster sizes range from {cluster_sizes.min()} to {cluster_sizes.max()} "
f"(mean={cluster_sizes.mean():.1f}). "
f"This may affect the reliability of cluster-robust inference.",
UserWarning,
stacklevel=3
)
if data_sample[cluster_var].isna().any():
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' contains missing values."
)
# Verify cluster is nested within units (each unit belongs to one cluster).
# Cluster-robust SE assumes independence across clusters; if a unit
# belongs to multiple clusters, this assumption is violated.
ivar_cluster_map = data.groupby(ivar)[cluster_var].nunique()
if (ivar_cluster_map > 1).any():
violating_units = ivar_cluster_map[ivar_cluster_map > 1]
n_violating = len(violating_units)
example_unit = violating_units.index[0]
example_clusters = data[data[ivar] == example_unit][cluster_var].unique()
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' is not nested within unit variable '{ivar}'. "
f"{n_violating} unit(s) belong to multiple clusters across time periods. "
f"In panel data, each unit must belong to exactly one cluster. "
f"This is required for cluster-robust SE (assumes cluster independence). "
f"Violating units: {list(violating_units.index[:5])}{'...' if n_violating > 5 else ''}. "
f"Example: Unit {example_unit} belongs to {len(example_clusters)} different clusters {list(example_clusters)}."
)
cluster_groups = data_sample[cluster_var].values
model = sm.OLS(y_vals, X, missing='drop')
results = model.fit(
cov_type='cluster',
cov_kwds={'groups': cluster_groups}
)
G = len(np.unique(cluster_groups))
else:
raise InvalidVCETypeError(
f"Invalid vce type: '{vce}'. "
f"Valid options: None, 'hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust', 'cluster'"
)
# Extract parameter estimates as numpy arrays for consistent indexing.
# statsmodels may return Series or ndarray depending on input format.
params_vals = results.params.values if hasattr(results.params, 'values') else results.params
bse_vals = results.bse.values if hasattr(results.bse, 'values') else results.bse
# Require at least 2 parameters: intercept (index 0) and treatment (index 1).
if len(params_vals) < 2:
raise InsufficientDataError(
f"Insufficient variation in treatment indicator (d_) in firstpost cross-section. "
f"Regression model has only {len(params_vals)} parameter(s) (expected >= 2). "
f"This typically occurs when all units in the firstpost sample have the same "
f"treatment status (all d_=0 or all d_=1). "
f"Please check that your data contains both treated and control units in the "
f"first post-treatment period."
)
# ATT is the coefficient on the treatment indicator (index 1).
# Index 0 is the intercept; indices 2+ are control variables if included.
att = params_vals[1]
se_att = bse_vals[1]
# Warn about near-zero SE which indicates potential numerical issues.
if se_att < 1e-10:
warnings.warn(
f"Standard error is extremely small (SE={se_att:.2e}). "
f"This may indicate perfect fit, numerical issues, or data problems. "
f"Results (t-statistic={att/se_att:.2e}, p-value, CI) may be unreliable. "
f"Consider checking: (1) model fit diagnostics (R², residuals), "
f"(2) data quality (duplicates, errors), (3) model specification.",
UserWarning,
stacklevel=3
)
# Degrees of freedom for t-distribution inference.
# For clustered SE, use G-1 (number of clusters minus 1) for more
# conservative inference with few clusters. This accounts for the
# fact that cluster-robust SE estimation effectively estimates G
# cluster-level quantities, reducing effective degrees of freedom.
if vce == 'cluster':
df = G - 1
else:
df = results.df_resid
t_stat = att / se_att
# Two-sided p-value using the t-distribution CDF.
# Multiply by 2 for two-sided test under H0: ATT = 0.
pvalue = 2 * stats.t.cdf(-abs(t_stat), df)
# Confidence interval using t critical value.
# Under homoskedasticity and normality, this provides exact coverage.
# With HC/cluster SE, coverage is asymptotically valid.
t_crit = stats.t.ppf(1 - alpha / 2, df)
ci_lower = att - t_crit * se_att
ci_upper = att + t_crit * se_att
N_treated_sample = int(data_sample[d].sum())
N_control_sample = int(len(data_sample) - N_treated_sample)
return {
'att': att,
'se_att': se_att,
't_stat': t_stat,
'pvalue': pvalue,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'params': results.params,
'bse': results.bse,
'vcov': results.cov_params(),
'resid': results.resid,
'nobs': int(results.nobs),
'df_resid': int(results.df_resid),
'df_inference': int(df), # actual df used for inference
'vce_type': vce if vce is not None else 'ols',
'cluster_var': cluster_var if vce == 'cluster' else None,
'n_clusters': G if vce == 'cluster' else None,
'controls_used': controls_used,
'controls': controls if controls_used else [],
'controls_spec': controls_spec,
'n_treated_sample': N_treated_sample,
'n_control_sample': N_control_sample,
}
[docs]
def estimate_period_effects(
data: pd.DataFrame,
ydot: str,
d: str,
tindex: str,
tpost1: int,
Tmax: int,
controls_spec: dict[str, Any] | None,
vce: str | None,
cluster_var: str | None,
period_labels: dict[int, str],
alpha: float = 0.05,
) -> pd.DataFrame:
"""
Estimate period-specific ATTs via independent cross-sectional regressions.
Iterates through post-treatment periods and estimates the treatment effect
for each period using the residualized outcome variable. Each period is
estimated via a separate OLS regression, enabling examination of treatment
effect dynamics over time.
Parameters
----------
data : pd.DataFrame
Transformed panel data containing the outcome variable.
ydot : str
Name of the residualized outcome variable.
d : str
Name of the treatment indicator column.
tindex : str
Name of the time index column.
tpost1 : int
First post-treatment period index.
Tmax : int
Last period index.
controls_spec : dict[str, Any] | None
Control variable specification dictionary returned by
``prepare_controls``. If None or if 'include' is False, regression
includes only the treatment indicator.
vce : str | None
Type of variance-covariance estimator. Same options as
``estimate_att``: None (homoskedastic), 'robust'/'hc1', 'hc3',
or 'cluster'.
cluster_var : str | None
Name of the clustering variable. Required if ``vce='cluster'``.
period_labels : dict[int, str]
Mapping from time index to human-readable period labels for output.
alpha : float, default=0.05
Significance level for confidence intervals.
Returns
-------
pd.DataFrame
A DataFrame with one row per post-treatment period. Columns:
- 'period' : str
Human-readable period label.
- 'tindex' : int
Time index value.
- 'beta' : float
Estimated ATT for the period.
- 'se' : float
Standard error.
- 'ci_lower', 'ci_upper' : float
Confidence interval bounds.
- 'tstat' : float
t-statistic for H0: ATT = 0.
- 'pval' : float
Two-sided p-value.
- 'N' : int
Number of observations in the period.
See Also
--------
estimate_att : Estimates average ATT across post-treatment periods.
prepare_controls : Prepares control specification for regression.
Notes
-----
Each period is estimated independently via cross-sectional OLS:
.. math::
\\dot{Y}_{it} = \\alpha_t + \\tau_t D_i + X_i \\beta_t +
D_i (X_i - \\bar{X}_1) \\delta_t + U_{it}
where :math:`\\dot{Y}_{it}` is the transformed outcome and
:math:`\\tau_t` is the period-specific ATT.
The estimators are consistent under no anticipation and parallel trends.
Period-specific estimates allow examination of treatment effect dynamics,
which is useful for detecting delayed effects or effect decay over time.
Periods with insufficient variation in the treatment indicator (all
treated or all control) produce NaN estimates with a warning. This can
occur due to panel attrition or unbalanced designs.
"""
results_list = []
for t in range(tpost1, Tmax + 1):
mask_t = (data[tindex] == t)
data_t = data[mask_t]
if controls_spec is not None and controls_spec['include']:
X_centered_t = controls_spec['X_centered'].loc[data_t.index]
interactions_t = controls_spec['interactions'].loc[data_t.index]
controls_list = list(controls_spec['X_mean_treated'].keys())
X_parts = [
data_t[[d]],
data_t[controls_list],
interactions_t
]
X_df = pd.concat(X_parts, axis=1)
X_t = sm.add_constant(X_df.astype(float).values)
else:
d_vals = data_t[d].astype(float).values
X_t = sm.add_constant(d_vals)
# Normalize vce parameter (case-insensitive)
vce_lower = vce.lower() if vce else None
try:
if vce_lower is None:
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit()
elif vce_lower == 'hc0':
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit(cov_type='HC0')
elif vce_lower in ('hc1', 'robust'):
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit(cov_type='HC1')
elif vce_lower == 'hc2':
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit(cov_type='HC2')
elif vce_lower == 'hc3':
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit(cov_type='HC3')
elif vce_lower == 'hc4':
# HC4: adaptive leverage correction for high-leverage observations.
# statsmodels does not support HC4; compute manually.
model_ols_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit()
n_obs_t = int(model_ols_t.nobs)
k_t = X_t.shape[1]
residuals_t = model_ols_t.resid
# Compute leverage values
XtX_inv_t = np.linalg.inv(X_t.T @ X_t)
tmp_t = X_t @ XtX_inv_t
h_ii_t = (tmp_t * X_t).sum(axis=1)
h_ii_t = np.clip(h_ii_t, 0, 0.9999)
# HC4 adjustment
delta_t = np.minimum(4.0, n_obs_t * h_ii_t / k_t)
delta_t = np.maximum(delta_t, 0.0)
omega_diag_t = (residuals_t ** 2) / ((1 - h_ii_t) ** delta_t)
# Sandwich variance
meat_t = X_t.T @ np.diag(omega_diag_t) @ X_t
var_beta_t = XtX_inv_t @ meat_t @ XtX_inv_t
# Create mock results
class HC4ResultsT:
def __init__(self, ols_results, var_beta):
self.params = ols_results.params
# Handle both Series and ndarray for params
if hasattr(ols_results.params, 'index'):
self.bse = pd.Series(np.sqrt(np.diag(var_beta)), index=ols_results.params.index)
else:
self.bse = np.sqrt(np.diag(var_beta))
self.df_resid = ols_results.df_resid
self.nobs = ols_results.nobs
model_t = HC4ResultsT(model_ols_t, var_beta_t)
elif vce_lower == 'cluster':
if cluster_var is None:
raise InvalidParameterError(
"vce='cluster' requires cluster_var parameter to be specified."
)
if cluster_var not in data_t.columns:
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' not found in data for period t={t}."
)
if data_t[cluster_var].isna().any():
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' contains missing values in period t={t}."
)
n_clusters_t = data_t[cluster_var].nunique()
if n_clusters_t < 2:
raise InvalidParameterError(
f"Cluster variable '{cluster_var}' must have at least 2 unique values in period t={t}. "
f"Found: {n_clusters_t}"
)
cluster_groups_t = data_t[cluster_var].values
model_t = sm.OLS(data_t[ydot], X_t, missing='drop').fit(
cov_type='cluster',
cov_kwds={'groups': cluster_groups_t}
)
else:
raise InvalidVCETypeError(
f"Invalid vce type: '{vce}'. "
f"Valid options: None, 'hc0', 'hc1', 'hc2', 'hc3', 'hc4', 'robust', 'cluster'"
)
params_vals = model_t.params.values if hasattr(model_t.params, 'values') else model_t.params
bse_vals = model_t.bse.values if hasattr(model_t.bse, 'values') else model_t.bse
beta_t = params_vals[1]
se_t = bse_vals[1]
if se_t == 0 or np.isnan(se_t) or np.isinf(se_t):
period_label = period_labels.get(t, str(t))
warnings.warn(
f"Period {period_label} (t={t}) regression produced degenerate results "
f"(se={se_t}). This likely indicates perfect separation (all d=0 or all d=1) "
f"or insufficient variation in treatment variable. Setting results to NaN.",
category=UserWarning,
stacklevel=2
)
beta_t = se_t = t_stat = p_val = np.nan
ci_lower = ci_upper = np.nan
N_t = int(model_t.nobs)
else:
# Select degrees of freedom based on variance estimator.
# Cluster-robust uses G-1 for conservative inference.
if vce_lower == 'cluster':
n_clusters_t = len(np.unique(cluster_groups_t))
df_t = n_clusters_t - 1
else:
df_t = model_t.df_resid
t_stat = beta_t / se_t
# Two-sided p-value for H0: tau_t = 0.
p_val = 2 * stats.t.cdf(-abs(t_stat), df_t)
N_t = int(model_t.nobs)
# Confidence interval with t critical value.
t_crit_t = stats.t.ppf(1 - alpha / 2, df_t)
ci_lower = beta_t - t_crit_t * se_t
ci_upper = beta_t + t_crit_t * se_t
except Exception as e:
period_label = period_labels.get(t, str(t))
warnings.warn(
f"Period {period_label} (t={t}) regression failed: {str(e)}. "
f"Setting results to NaN. This may indicate insufficient sample size, "
f"perfect separation (all d=0 or all d=1), or numerical issues. "
f"Check data quality for this period.",
category=UserWarning,
stacklevel=2
)
beta_t = se_t = t_stat = p_val = np.nan
ci_lower = ci_upper = np.nan
valid_mask = data_t[ydot].notna() & data_t[d].notna()
if controls_spec is not None and 'X_centered' in controls_spec:
for col in controls_spec['X_centered'].columns:
if col in data_t.columns:
valid_mask &= data_t[col].notna()
N_t = int(valid_mask.sum())
results_list.append({
'period': period_labels.get(t, str(t)),
'tindex': t,
'beta': beta_t,
'se': se_t,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'tstat': t_stat,
'pval': p_val,
'N': N_t
})
return pd.DataFrame(results_list)