"""
Treatment effect estimators for difference-in-differences settings.
This module provides inverse probability weighting (IPW), inverse probability
weighted regression adjustment (IPWRA), and propensity score matching (PSM)
estimators for ATT estimation. These estimators handle covariate imbalance
between treated and control groups through propensity score methods.
The IPW estimator reweights control observations to match the covariate
distribution of treated units, providing consistent ATT estimates when the
propensity score model is correctly specified.
The IPWRA estimator combines propensity score weighting with outcome regression
to achieve double robustness: consistency requires correct specification of
either the propensity score model or the outcome regression model, but not
necessarily both. This property makes IPWRA particularly valuable when there
is uncertainty about functional form assumptions.
The PSM estimator uses nearest-neighbor matching on estimated propensity scores,
with options for caliper constraints, replacement, and multiple matches per
treated unit. Standard errors use either a heteroskedasticity-robust formula
or bootstrap resampling.
Notes
-----
All estimators take transformed outcome variables (after unit-specific
demeaning or detrending) as inputs and return ATT estimates for specific
cohort-time pairs in staggered adoption designs. The propensity score model
uses logistic regression without regularization for unbiased coefficient
estimates.
"""
from __future__ import annotations
import warnings
from dataclasses import dataclass
from typing import Any
import numpy as np
import pandas as pd
import scipy.stats as stats
# ============================================================================
# IPW (Inverse Probability Weighting) Estimator
# ============================================================================
[docs]
@dataclass
class IPWResult:
"""
Result container for inverse probability weighting ATT estimation.
Attributes
----------
att : float
ATT point estimate.
se : float
Standard error.
ci_lower : float
Lower bound of confidence interval.
ci_upper : float
Upper bound of confidence interval.
t_stat : float
t-statistic.
pvalue : float
Two-sided p-value.
propensity_scores : np.ndarray
Estimated propensity scores.
weights : np.ndarray
IPW weights w = p/(1-p) for control units.
propensity_model_coef : dict[str, float]
Propensity score model coefficients.
n_treated : int
Number of treated units.
n_control : int
Number of control units.
df_resid : int
Residual degrees of freedom.
df_inference : int
Degrees of freedom for inference.
weights_cv : float
Coefficient of variation of IPW weights.
diagnostics : dict[str, Any] or None
Additional diagnostic information.
"""
att: float
se: float
ci_lower: float
ci_upper: float
t_stat: float
pvalue: float
propensity_scores: np.ndarray
weights: np.ndarray
propensity_model_coef: dict[str, float]
n_treated: int
n_control: int
df_resid: int = 0
df_inference: int = 0
weights_cv: float = 0.0
diagnostics: dict[str, Any] | None = None
# ============================================================================
# IPWRA (Doubly Robust) Estimator
# ============================================================================
[docs]
@dataclass
class IPWRAResult:
"""
Result container for doubly robust IPWRA ATT estimation.
Attributes
----------
att : float
ATT point estimate.
se : float
Standard error.
ci_lower : float
Lower bound of confidence interval.
ci_upper : float
Upper bound of confidence interval.
t_stat : float
t-statistic.
pvalue : float
Two-sided p-value.
propensity_scores : np.ndarray
Estimated propensity scores.
weights : np.ndarray
IPW weights w = p/(1-p) for control units.
outcome_model_coef : dict[str, float]
Outcome model coefficients.
propensity_model_coef : dict[str, float]
Propensity score model coefficients.
n_treated : int
Number of treated units.
n_control : int
Number of control units.
"""
att: float
se: float
ci_lower: float
ci_upper: float
t_stat: float
pvalue: float
propensity_scores: np.ndarray
weights: np.ndarray
outcome_model_coef: dict[str, float]
propensity_model_coef: dict[str, float]
n_treated: int
n_control: int
[docs]
def estimate_ipw(
data: pd.DataFrame,
y: str,
d: str,
propensity_controls: list[str],
trim_threshold: float = 0.01,
alpha: float = 0.05,
vce: str | None = None,
return_diagnostics: bool = False,
gvar_col: str | None = None,
ivar_col: str | None = None,
cohort_g: int | None = None,
period_r: int | None = None,
) -> IPWResult:
"""
Estimate ATT using inverse probability weighting.
IPW estimates the average treatment effect on the treated by reweighting
control observations to match the covariate distribution of treated units.
Parameters
----------
data : pd.DataFrame
Dataset containing outcome, treatment, and control variables.
y : str
Outcome variable column name.
d : str
Treatment indicator column name (1=treated, 0=control).
propensity_controls : list[str]
Variables for propensity score model.
trim_threshold : float, default=0.01
Propensity score trimming threshold. Observations with propensity
scores outside [trim_threshold, 1-trim_threshold] are excluded.
alpha : float, default=0.05
Significance level for confidence intervals.
vce : str, optional
Variance estimator type ('robust', 'hc0', 'hc1', 'hc2', 'hc3').
return_diagnostics : bool, default=False
Whether to return additional diagnostic information.
gvar_col : str, optional
Column name for cohort indicator (for staggered designs).
ivar_col : str, optional
Column name for unit identifier (for staggered designs).
cohort_g : int, optional
Cohort value (for staggered designs).
period_r : int, optional
Period value (for staggered designs).
Returns
-------
IPWResult
Estimation results including ATT, standard error, and diagnostics.
Raises
------
ValueError
If no propensity controls are specified, no treated or control units
exist, or all observations are trimmed.
Notes
-----
The IPW estimator for ATT is:
.. math::
\\hat{\\tau}_{IPW} = \\frac{1}{N_1} \\sum_{i:D_i=1} Y_i
- \\frac{1}{N_1} \\sum_{i:D_i=0} \\frac{\\hat{p}(X_i)}{1-\\hat{p}(X_i)} Y_i
where :math:`\\hat{p}(X_i)` is the estimated propensity score.
Inference uses the normal distribution based on influence function
asymptotics, consistent with IPWRA and PSM estimators in this package.
"""
# Validate inputs.
if not propensity_controls:
raise ValueError("propensity_controls must be specified for IPW estimation.")
# Extract data.
y_values = data[y].values
d_values = data[d].values.astype(int)
X_ps = data[propensity_controls].values
treated_mask = d_values == 1
control_mask = d_values == 0
n_treated = treated_mask.sum()
n_control = control_mask.sum()
if n_treated == 0:
raise ValueError("No treated units in the data.")
if n_control == 0:
raise ValueError("No control units in the data.")
# Estimate propensity scores.
ps, ps_coef = estimate_propensity_score(
data=data,
d=d,
controls=propensity_controls,
trim_threshold=trim_threshold,
)
# Compute trimmed mask based on threshold.
trimmed_mask = (ps < trim_threshold) | (ps > 1 - trim_threshold)
# Apply trimming.
valid_mask = ~trimmed_mask
if valid_mask.sum() == 0:
raise ValueError("All observations were trimmed. Adjust trim_threshold.")
y_valid = y_values[valid_mask]
d_valid = d_values[valid_mask]
ps_valid = ps[valid_mask]
treated_valid = d_valid == 1
control_valid = d_valid == 0
n_treated_valid = treated_valid.sum()
n_control_valid = control_valid.sum()
if n_treated_valid == 0:
raise ValueError("No treated units remain after trimming.")
if n_control_valid == 0:
raise ValueError("No control units remain after trimming.")
# IPW 权重 w = p/(1-p),用于重加权控制组以匹配处理组的协变量分布。
w_raw = ps_valid[control_valid] / (1 - ps_valid[control_valid])
w_raw_sum = w_raw.sum()
# 归一化权重(用于返回值和诊断,不影响 ATT 估计)。
weights = np.zeros(len(y_valid))
weights[control_valid] = w_raw * n_treated_valid / w_raw_sum if w_raw_sum > 0 else 0.0
# ATT(Hajek 形式):mean_treated(Y) - sum_ctrl(w*Y) / sum_ctrl(w)。
y_treated_mean = y_valid[treated_valid].mean()
y_control_weighted = np.sum(w_raw * y_valid[control_valid]) / w_raw_sum
att = y_treated_mean - y_control_weighted
# ================================================================
# M-estimation influence function(含 PS 估计不确定性调整)。
#
# IPW-ATT 不具有 doubly robust 性质,PS 估计的不确定性对 SE 有
# 显著影响。使用 Horvitz-Thompson IF + PS score 调整。
#
# 参考:Lunceford & Davidian (2004), Wooldridge (2010, Ch.21)
# ================================================================
n_valid = len(y_valid)
D_valid = d_valid.astype(float)
p_bar = n_treated_valid / n_valid
# HT 形式的 influence function。
psi_ht = np.zeros(n_valid)
psi_ht[treated_valid] = (y_valid[treated_valid] - att) / p_bar
psi_ht[control_valid] = -w_raw * y_valid[control_valid] / p_bar
# PS 估计不确定性的 M-estimation 调整。
# 构造设计矩阵(含截距)。
X_ps_valid = data[propensity_controls].values[valid_mask].astype(float)
X_const = np.column_stack([np.ones(n_valid), X_ps_valid])
# Logit score: S_i = (D_i - p_i) * X_i。
S = (D_valid - ps_valid)[:, np.newaxis] * X_const
# Logit Hessian: H = -(1/N) * X' diag(p*(1-p)) X。
W_ps = ps_valid * (1 - ps_valid)
H = -(X_const.T * W_ps) @ X_const / n_valid
try:
H_inv = np.linalg.inv(H)
except np.linalg.LinAlgError:
H_inv = np.linalg.pinv(H)
# dATT_HT/dgamma = -sum_ctrl(dw/dgamma * Y) / (N * p_bar)。
# 其中 dw/dgamma = p/(1-p) * X(对 control units)。
dw_dgamma_ctrl = w_raw[:, np.newaxis] * X_const[control_valid]
Y_ctrl = y_valid[control_valid]
dATT_dgamma = -(dw_dgamma_ctrl * Y_ctrl[:, np.newaxis]).sum(axis=0) / (n_valid * p_bar)
# M-estimation 调整: psi = psi_ht - dATT/dgamma * H^{-1} * S_i。
adjustment = (S @ H_inv.T) @ dATT_dgamma
psi = psi_ht - adjustment
# 方差估计。
var_att = np.var(psi, ddof=1) / n_valid
se = np.sqrt(var_att)
# Degrees of freedom retained for interface compatibility; inference uses normal distribution.
df_resid = n_treated_valid + n_control_valid - 2
df_resid = max(1, df_resid)
df_inference = df_resid
# z-statistic and p-value using normal distribution (asymptotic inference).
if se > 0:
t_stat = att / se
pvalue = 2 * (1 - stats.norm.cdf(abs(t_stat)))
else:
t_stat = np.nan
pvalue = np.nan
# Confidence interval using normal distribution (asymptotic inference).
z_crit = stats.norm.ppf(1 - alpha / 2)
ci_lower = att - z_crit * se
ci_upper = att + z_crit * se
# Coefficient of variation diagnoses overlap violations; high CV indicates extreme weights.
weights_control = weights[control_valid]
if len(weights_control) > 1:
weights_mean = np.mean(weights_control)
weights_std = np.std(weights_control, ddof=1)
weights_cv = weights_std / weights_mean if weights_mean > 0 else np.nan
else:
weights_cv = 0.0
return IPWResult(
att=att,
se=se,
ci_lower=ci_lower,
ci_upper=ci_upper,
t_stat=t_stat,
pvalue=pvalue,
propensity_scores=ps,
weights=weights,
propensity_model_coef=ps_coef,
n_treated=n_treated_valid,
n_control=n_control_valid,
df_resid=df_resid,
df_inference=df_inference,
weights_cv=weights_cv,
)
[docs]
def estimate_ipwra(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str],
propensity_controls: list[str] | None = None,
trim_threshold: float = 0.01,
se_method: str = 'analytical',
n_bootstrap: int = 200,
seed: int | None = None,
alpha: float = 0.05,
return_diagnostics: bool = False,
gvar_col: str | None = None,
ivar_col: str | None = None,
cohort_g: int | None = None,
period_r: int | None = None,
) -> IPWRAResult:
"""
Estimate ATT using inverse probability weighted regression adjustment.
IPWRA is a doubly robust estimator that combines propensity score weighting
with outcome regression. The estimator remains consistent if either the
propensity score model or the outcome regression model is correctly
specified, providing protection against model misspecification.
The IPWRA-ATT estimator is computed as:
.. math::
\\hat{\\tau} = \\frac{1}{N_1} \\sum_{D_i=1} [Y_i - \\hat{m}_0(X_i)]
- \\frac{\\sum_{D_i=0} w_i (Y_i - \\hat{m}_0(X_i))}
{\\sum_{D_i=0} w_i}
where :math:`\\hat{m}_0(X)` is the estimated outcome regression for controls,
and :math:`w_i = \\hat{p}(X_i) / (1 - \\hat{p}(X_i))` are IPW weights.
Parameters
----------
data : pd.DataFrame
Cross-sectional data with one row per unit.
y : str
Outcome variable column name (typically transformed outcome).
d : str
Treatment indicator column name (1=treated, 0=control).
controls : list[str]
Control variables for the outcome regression model.
propensity_controls : list[str], optional
Control variables for the propensity score model.
If None, uses the same variables as ``controls``.
trim_threshold : float, default=0.01
Propensity score trimming threshold. Scores are clipped to
[trim_threshold, 1-trim_threshold] to prevent extreme weights.
se_method : str, default='analytical'
Standard error computation method: 'analytical' uses influence
functions, 'bootstrap' uses nonparametric bootstrap.
n_bootstrap : int, default=200
Number of bootstrap replications when se_method='bootstrap'.
seed : int, optional
Random seed for bootstrap resampling.
alpha : float, default=0.05
Significance level for confidence intervals.
return_diagnostics : bool, default=False
Whether to return additional diagnostic information.
gvar_col : str, optional
Cohort indicator column name (for staggered designs).
ivar_col : str, optional
Unit identifier column name (for staggered designs).
cohort_g : int, optional
Cohort value (for staggered designs).
period_r : int, optional
Period value (for staggered designs).
Returns
-------
IPWRAResult
Estimation results containing ATT estimate, standard error,
confidence interval, propensity scores, and model coefficients.
Raises
------
ValueError
If required columns are missing, sample sizes are insufficient,
or model estimation fails to converge.
See Also
--------
estimate_ipw : Pure IPW estimator without outcome regression.
estimate_psm : Propensity score matching estimator.
"""
# Validate inputs.
if y not in data.columns:
raise ValueError(f"Outcome variable '{y}' not found in data.")
if d not in data.columns:
raise ValueError(f"Treatment indicator '{d}' not found in data.")
missing_controls = [c for c in controls if c not in data.columns]
if missing_controls:
raise ValueError(f"Control variables not found: {missing_controls}")
if propensity_controls is None:
propensity_controls = controls
else:
missing_ps = [c for c in propensity_controls if c not in data.columns]
if missing_ps:
raise ValueError(f"Propensity score controls not found: {missing_ps}")
# Remove observations with missing values.
all_vars = [y, d] + list(set(controls + propensity_controls))
data_clean = data[all_vars].dropna().copy()
n = len(data_clean)
D = data_clean[d].values.astype(float)
Y = data_clean[y].values.astype(float)
n_treated = int(D.sum())
n_control = int((1 - D).sum())
if n_treated < 2:
raise ValueError(f"Insufficient treated units: n_treated={n_treated}, requires at least 2.")
if n_control < 2:
raise ValueError(f"Insufficient control units: n_control={n_control}, requires at least 2.")
if n_treated < 5:
warnings.warn(
f"Small treated sample (n_treated={n_treated}); IPWRA estimates may be unstable.",
UserWarning
)
if n_control < 10:
warnings.warn(
f"Small control sample (n_control={n_control}); outcome model may be unstable.",
UserWarning
)
# Estimate propensity scores.
pscores, ps_coef = estimate_propensity_score(
data_clean, d, propensity_controls, trim_threshold
)
# Compute ATT weights for WLS outcome model estimation.
# IPWRA uses WLS with w_i = 1 for treated units and w_i = p(X)/(1-p(X)) for
# controls. This reweights the control sample to match the treated covariate
# distribution, ensuring the outcome model targets the ATT estimand.
att_weights = np.where(D == 1, 1.0, pscores / (1 - pscores))
# Estimate outcome model on control units using WLS with ATT weights.
m0_hat, outcome_coef = estimate_outcome_model(
data_clean, y, d, controls, sample_weights=att_weights
)
# Compute IPWRA-ATT estimator.
treat_mask = D == 1
control_mask = D == 0
# Treated component: (1/N_1) sum_{D=1} [Y - m_0(X)].
treat_term = (Y[treat_mask] - m0_hat[treat_mask]).mean()
# Weighted control component.
weights = pscores / (1 - pscores)
weights_control = weights[control_mask]
residuals_control = Y[control_mask] - m0_hat[control_mask]
# Check for extreme weights indicating overlap violation.
weights_cv = np.std(weights_control) / np.mean(weights_control) if np.mean(weights_control) > 0 else np.inf
if weights_cv > 2.0:
warnings.warn(
f"Extreme IPW weights detected (CV={weights_cv:.2f} > 2.0), indicating potential "
f"overlap violation. Consider checking propensity score distribution or increasing "
f"trim_threshold.",
UserWarning
)
# Check proportion of extreme propensity scores.
extreme_low = (pscores < 0.05).mean()
extreme_high = (pscores > 0.95).mean()
if extreme_low > 0.1 or extreme_high > 0.1:
warnings.warn(
f"High proportion of extreme propensity scores (p<0.05: {extreme_low:.1%}, "
f"p>0.95: {extreme_high:.1%}), suggesting overlap assumption may be violated. "
f"Consider increasing trim_threshold or reviewing covariate selection.",
UserWarning
)
weights_sum = weights_control.sum()
if weights_sum <= 0:
raise ValueError("Sum of IPW weights is non-positive; propensity score model may be misspecified.")
control_term = (weights_control * residuals_control).sum() / weights_sum
att = treat_term - control_term
# Compute standard errors.
if se_method == 'analytical':
se, ci_lower, ci_upper = compute_ipwra_se_analytical(
data_clean, y, d, controls,
att, pscores, m0_hat, weights, alpha,
propensity_controls=propensity_controls,
)
elif se_method == 'bootstrap':
se, ci_lower, ci_upper = compute_ipwra_se_bootstrap(
data_clean, y, d, controls, propensity_controls,
trim_threshold, n_bootstrap, seed, alpha
)
else:
raise ValueError(f"Unknown se_method: {se_method}. Use 'analytical' or 'bootstrap'.")
# Compute t-statistic and p-value.
if se > 0:
t_stat = att / se
pvalue = 2 * (1 - stats.norm.cdf(abs(t_stat)))
else:
t_stat = np.nan
pvalue = np.nan
return IPWRAResult(
att=att,
se=se,
ci_lower=ci_lower,
ci_upper=ci_upper,
t_stat=t_stat,
pvalue=pvalue,
propensity_scores=pscores,
weights=weights,
outcome_model_coef=outcome_coef,
propensity_model_coef=ps_coef,
n_treated=n_treated,
n_control=n_control,
)
[docs]
def estimate_propensity_score(
data: pd.DataFrame,
d: str,
controls: list[str],
trim_threshold: float = 0.01,
) -> tuple[np.ndarray, dict[str, float]]:
"""
Estimate propensity scores using logistic regression.
Fits a logit model P(D=1|X) without regularization, using maximum
likelihood estimation for unbiased coefficient estimates.
Parameters
----------
data : pd.DataFrame
Dataset containing treatment and control variables.
d : str
Treatment indicator column name.
controls : list[str]
Covariate column names for the propensity score model.
trim_threshold : float, default=0.01
Trimming threshold for extreme propensity scores.
Returns
-------
propensity_scores : np.ndarray
Estimated propensity scores, trimmed to [trim_threshold, 1-trim_threshold].
coefficients : dict[str, float]
Dictionary mapping variable names to estimated coefficients,
including '_intercept' for the constant term.
Raises
------
ValueError
If the logistic regression fails to converge.
"""
from sklearn.linear_model import LogisticRegression
X = data[controls].values.astype(float)
D = data[d].values.astype(float)
# Standardize covariates for numerical stability.
X_mean = X.mean(axis=0)
X_std = X.std(axis=0)
X_std[X_std == 0] = 1 # Avoid division by zero for constant columns.
X_scaled = (X - X_mean) / X_std
# No regularization ensures unbiased coefficient estimates.
try:
model = LogisticRegression(
penalty=None,
solver='lbfgs',
max_iter=1000,
tol=1e-8,
)
model.fit(X_scaled, D)
except Exception as e:
raise ValueError(f"Propensity score model estimation failed: {e}")
# Predict propensity scores.
pscores = model.predict_proba(X_scaled)[:, 1]
# Trimming prevents extreme IPW weights that inflate variance.
pscores = np.clip(pscores, trim_threshold, 1 - trim_threshold)
# Transform coefficients back to original scale.
coef_scaled = model.coef_[0]
intercept = model.intercept_[0]
coef_original = coef_scaled / X_std
intercept_original = intercept - (coef_scaled * X_mean / X_std).sum()
coef_dict = {'_intercept': intercept_original}
for i, name in enumerate(controls):
coef_dict[name] = coef_original[i]
return pscores, coef_dict
[docs]
def estimate_outcome_model(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str],
sample_weights: np.ndarray | None = None,
) -> tuple[np.ndarray, dict[str, float]]:
"""
Estimate the outcome regression model on control units.
Fits a linear regression E(Y|X, D=0) using control observations only,
then generates predicted values for all units. Optionally uses weighted
least squares (WLS) when sample_weights are provided.
Parameters
----------
data : pd.DataFrame
Dataset containing outcome, treatment, and control variables.
y : str
Outcome variable column name.
d : str
Treatment indicator column name.
controls : list[str]
Covariate column names for the outcome model.
sample_weights : np.ndarray, optional
Weights for weighted least squares estimation. If provided, must have
the same length as the data. Only weights for control units (D=0) are
used in fitting. For IPWRA with ATT targeting, weights should be
w_i = p(X_i) / (1 - p(X_i)) to reweight controls to the treated
covariate distribution.
Returns
-------
predictions : np.ndarray
Predicted outcome values for all units based on control regression.
coefficients : dict[str, float]
Dictionary mapping variable names to estimated coefficients,
including '_intercept' for the constant term.
Raises
------
ValueError
If the design matrix is singular and cannot be inverted.
Notes
-----
When sample_weights are provided, the outcome model is estimated using WLS:
.. math::
\\hat{\\beta} = (X'WX)^{-1} X'WY
where W is the diagonal weight matrix. For IPWRA with ATT targeting, the
weights for control units should be w_i = p(X_i) / (1 - p(X_i)), which
reweights the control sample to match the treated covariate distribution.
"""
D = data[d].values.astype(float)
Y = data[y].values.astype(float)
X = data[controls].values.astype(float)
# Extract control group data.
control_mask = D == 0
X_control = X[control_mask]
Y_control = Y[control_mask]
# Add intercept term.
X_control_const = np.column_stack([np.ones(len(X_control)), X_control])
if sample_weights is not None:
# 验证权重有效性(BUG-065 修复)
if not np.all(np.isfinite(sample_weights)):
raise ValueError(
"Invalid weights: sample_weights contain non-finite values "
"(inf or NaN). All weights must be finite."
)
w_mean = sample_weights[control_mask].mean() if control_mask.any() else 0.0
if w_mean <= 0 or not np.isfinite(w_mean):
raise ValueError(
f"Invalid weights: control unit weights have mean={w_mean:.6g}. "
"Weights must have a positive finite mean for normalization."
)
# Weighted Least Squares (WLS) estimation.
# Extract weights for control units only.
w_control = sample_weights[control_mask]
# Ensure weights are positive (clip near-zero values for numerical stability).
w_control = np.maximum(w_control, 1e-10)
# WLS via sqrt(W) transformation: beta = (X̃'X̃)^{-1} X̃'Ỹ.
sqrt_w = np.sqrt(w_control)
X_weighted = X_control_const * sqrt_w[:, np.newaxis]
Y_weighted = Y_control * sqrt_w
try:
XtWX_inv = np.linalg.inv(X_weighted.T @ X_weighted)
beta = XtWX_inv @ (X_weighted.T @ Y_weighted)
except np.linalg.LinAlgError:
raise ValueError("Weighted outcome model design matrix is singular; cannot estimate coefficients.")
else:
# Standard OLS estimation.
try:
XtX_inv = np.linalg.inv(X_control_const.T @ X_control_const)
beta = XtX_inv @ (X_control_const.T @ Y_control)
except np.linalg.LinAlgError:
raise ValueError("Outcome model design matrix is singular; cannot estimate coefficients.")
# Generate predictions for all units.
X_all_const = np.column_stack([np.ones(len(X)), X])
m0_hat = X_all_const @ beta
# Store coefficients.
coef_dict = {'_intercept': beta[0]}
for i, name in enumerate(controls):
coef_dict[name] = beta[i + 1]
return m0_hat, coef_dict
def compute_ipwra_se_analytical(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str],
att: float,
pscores: np.ndarray,
m0_hat: np.ndarray,
weights: np.ndarray,
alpha: float = 0.05,
propensity_controls: list[str] | None = None,
) -> tuple[float, float, float]:
"""
Compute IPWRA standard error using the full semiparametric IF.
Implements the complete influence function matching Stata ``teffects ipwra
... , atet`` default standard errors. The full IF consists of three
components derived from the stacked M-estimation framework (Cattaneo 2010,
Newey & McFadden 1994):
1. **Hajek IF**: linearization of the IPWRA-ATT Hajek ratio estimator
2. **PS adjustment**: M-estimation correction for propensity score
estimation uncertainty via logit score
3. **Outcome model adjustment**: M-estimation correction for WLS outcome
regression estimation uncertainty
The full IF is:
.. math::
\\psi_i = \\psi^{\\text{main}}_i
- \\left(\\frac{\\partial \\tau}{\\partial \\beta_0}\\right)^\\top
H_{\\beta}^{-1} S_{\\beta,i}
- \\left(\\frac{\\partial \\tau}{\\partial \\gamma}\\right)^\\top
H_{\\gamma}^{-1} S_{\\gamma,i}
where:
- :math:`\\partial\\tau/\\partial\\beta_0 = \\bar{X}_{0,w} - \\bar{X}_1`
(weighted control mean minus treated mean of covariates)
- :math:`\\partial\\tau/\\partial\\gamma = -\\sum_{D=0} w_i X_i (r_i - B)
/ \\sum_{D=0} w_i` (ATT sensitivity to PS parameters)
- :math:`S_{\\beta,i}` and :math:`H_{\\beta}` are the WLS score and
Hessian for the outcome model
- :math:`S_{\\gamma,i}` and :math:`H_{\\gamma}` are the logit score and
Hessian for the PS model
Parameters
----------
data : pd.DataFrame
Dataset used for estimation.
y : str
Outcome variable column name.
d : str
Treatment indicator column name.
controls : list[str]
Outcome model control variable column names.
att : float
Estimated ATT value.
pscores : np.ndarray
Estimated propensity scores.
m0_hat : np.ndarray
Predicted outcomes from control regression.
weights : np.ndarray
IPW weights (p/(1-p)) for all observations.
alpha : float, default=0.05
Significance level for confidence interval.
propensity_controls : list[str], optional
PS model control variable column names. If None, uses ``controls``.
Returns
-------
se : float
Standard error estimate.
ci_lower : float
Lower bound of confidence interval.
ci_upper : float
Upper bound of confidence interval.
References
----------
Cattaneo, M. D. (2010). Efficient semiparametric estimation of
multi-valued treatment effects under ignorability. *Journal of
Econometrics*, 155(2), 138-154.
Newey, W. K. & McFadden, D. (1994). Large sample estimation and
hypothesis testing. *Handbook of Econometrics*, Vol. 4, Ch. 36.
Lunceford, J. K. & Davidian, M. (2004). Stratification and weighting
via the propensity score in estimation of causal treatment effects.
*Statistics in Medicine*, 23(19), 2937-2960.
"""
if propensity_controls is None:
propensity_controls = controls
D = data[d].values.astype(float)
Y = data[y].values.astype(float)
X_out = data[controls].values.astype(float)
X_ps = data[propensity_controls].values.astype(float)
n = len(D)
n_treated = D.sum()
treat_mask = D == 1
control_mask = D == 0
p_bar = n_treated / n
weights_ctrl = weights[control_mask]
weights_sum = weights_ctrl.sum()
# ================================================================
# 组件 1: Hajek IF(主项)
# ================================================================
residuals_control = Y[control_mask] - m0_hat[control_mask]
control_term = (weights_ctrl * residuals_control).sum() / weights_sum
psi_treat = (Y[treat_mask] - m0_hat[treat_mask] - att) / p_bar
psi_control = -weights_ctrl * (residuals_control - control_term) / weights_sum * n
psi = np.zeros(n)
psi[treat_mask] = psi_treat
psi[control_mask] = psi_control
# ================================================================
# 组件 2: PS 估计不确定性调整
#
# ∂τ/∂γ = -Σ_{D=0} w_i X_i (r_i - B) / Σ_{D=0} w_i
#
# Logit score: S_γ_i = (D_i - p̂_i) · X_i
# Logit Hessian: H_γ = -(1/N) X' diag(p̂(1-p̂)) X
#
# 调整项: -(∂τ/∂γ)' H_γ⁻¹ S_γ_i
# ================================================================
X_ps_const = np.column_stack([np.ones(n), X_ps])
# Logit score。
S_gamma = (D - pscores)[:, np.newaxis] * X_ps_const
# Logit Hessian。
W_ps = pscores * (1 - pscores)
H_gamma = -(X_ps_const.T * W_ps) @ X_ps_const / n
try:
H_gamma_inv = np.linalg.inv(H_gamma)
except np.linalg.LinAlgError:
H_gamma_inv = np.linalg.pinv(H_gamma)
# ∂τ/∂γ: ∂w_i/∂γ = w_i · X_i(logit 链式法则)。
r_minus_B = residuals_control - control_term
dw_dgamma_ctrl = weights_ctrl[:, np.newaxis] * X_ps_const[control_mask]
dATT_dgamma = -(dw_dgamma_ctrl * r_minus_B[:, np.newaxis]).sum(axis=0) / weights_sum
# PS 调整: -(∂τ/∂γ)' H_γ⁻¹ S_γ_i。
ps_adjustment = (S_gamma @ H_gamma_inv.T) @ dATT_dgamma
# ================================================================
# 组件 3: Outcome Model 估计不确定性调整
#
# ∂τ/∂β₀ = -X̄₁ + X̄₀_w
# X̄₁ = (1/N₁) Σ_{D=1} X_i(处理组均值)
# X̄₀_w = Σ_{D=0} w_i X_i / Σ_{D=0} w_i(控制组加权均值)
#
# WLS score: S_β_i = (1-D_i) · w_i · (Y_i - X_i'β̂₀) · X_i
# WLS Hessian: H_β = -(1/N) Σ_{D=0} w_i X_i X_i'
#
# 调整项: -(∂τ/∂β₀)' H_β⁻¹ S_β_i
# ================================================================
X_out_const = np.column_stack([np.ones(n), X_out])
X_ctrl_const = X_out_const[control_mask]
# WLS score(仅控制组非零)。
S_beta = np.zeros((n, X_out_const.shape[1]))
S_beta[control_mask] = (
weights_ctrl[:, np.newaxis]
* residuals_control[:, np.newaxis]
* X_ctrl_const
)
# WLS Hessian: H_β = -(1/N) Σ_{D=0} w_i X_i X_i'。
H_beta = -(X_ctrl_const.T * weights_ctrl) @ X_ctrl_const / n
try:
H_beta_inv = np.linalg.inv(H_beta)
except np.linalg.LinAlgError:
H_beta_inv = np.linalg.pinv(H_beta)
# ∂τ/∂β₀ = -X̄₁ + X̄₀_w。
X_bar_treated = X_out_const[treat_mask].mean(axis=0)
X_bar_ctrl_w = (weights_ctrl[:, np.newaxis] * X_ctrl_const).sum(axis=0) / weights_sum
dATT_dbeta = -X_bar_treated + X_bar_ctrl_w
# Outcome model 调整: -(∂τ/∂β₀)' H_β⁻¹ S_β_i。
om_adjustment = (S_beta @ H_beta_inv.T) @ dATT_dbeta
# ================================================================
# 合并完整 IF
# ================================================================
psi_full = psi - ps_adjustment - om_adjustment
# 方差估计。
var_psi = np.var(psi_full, ddof=1)
var_att = var_psi / n
se = np.sqrt(var_att)
# 置信区间。
z_crit = stats.norm.ppf(1 - alpha / 2)
ci_lower = att - z_crit * se
ci_upper = att + z_crit * se
return se, ci_lower, ci_upper
def compute_ipwra_se_bootstrap(
data: pd.DataFrame,
y: str,
d: str,
controls: list[str],
propensity_controls: list[str] | None,
trim_threshold: float,
n_bootstrap: int,
seed: int | None,
alpha: float,
) -> tuple[float, float, float]:
"""
Compute IPWRA standard error using nonparametric bootstrap.
Resamples the entire estimation procedure including propensity score
estimation, outcome model fitting, and ATT computation. This approach
properly accounts for all sources of estimation uncertainty.
Parameters
----------
data : pd.DataFrame
Dataset used for estimation.
y : str
Outcome variable column name.
d : str
Treatment indicator column name.
controls : list[str]
Control variable column names for outcome model.
propensity_controls : list[str] or None
Control variable column names for propensity score model.
trim_threshold : float
Propensity score trimming threshold.
n_bootstrap : int
Number of bootstrap replications.
seed : int or None
Random seed for reproducibility.
alpha : float
Significance level for confidence interval.
Returns
-------
se : float
Bootstrap standard error.
ci_lower : float
Percentile confidence interval lower bound.
ci_upper : float
Percentile confidence interval upper bound.
"""
if seed is not None:
np.random.seed(seed)
if propensity_controls is None:
propensity_controls = controls
n = len(data)
att_boots = []
for _ in range(n_bootstrap):
# Resample with replacement.
indices = np.random.choice(n, size=n, replace=True)
data_boot = data.iloc[indices].reset_index(drop=True)
try:
pscores_boot, _ = estimate_propensity_score(
data_boot, d, propensity_controls, trim_threshold
)
# Compute ATT weights for WLS (same as main estimation).
D_boot = data_boot[d].values.astype(float)
att_weights_boot = np.where(D_boot == 1, 1.0, pscores_boot / (1 - pscores_boot))
m0_boot, _ = estimate_outcome_model(
data_boot, y, d, controls, sample_weights=att_weights_boot
)
Y_boot = data_boot[y].values.astype(float)
treat_mask = D_boot == 1
control_mask = D_boot == 0
if treat_mask.sum() < 1 or control_mask.sum() < 1:
continue
treat_term = (Y_boot[treat_mask] - m0_boot[treat_mask]).mean()
weights_boot = pscores_boot / (1 - pscores_boot)
weights_control = weights_boot[control_mask]
residuals_control = Y_boot[control_mask] - m0_boot[control_mask]
weights_sum = weights_control.sum()
if weights_sum > 0:
control_term = (weights_control * residuals_control).sum() / weights_sum
att_boot = treat_term - control_term
att_boots.append(att_boot)
except (ValueError, np.linalg.LinAlgError, FloatingPointError, ZeroDivisionError):
# Skip failed bootstrap iterations due to numerical issues.
continue
if len(att_boots) < n_bootstrap * 0.5:
warnings.warn(
f"Low bootstrap success rate: {len(att_boots)}/{n_bootstrap}",
UserWarning
)
if len(att_boots) < 10:
raise ValueError(f"Insufficient bootstrap samples: {len(att_boots)}")
att_boots = np.array(att_boots)
se = np.std(att_boots, ddof=1)
ci_lower = np.percentile(att_boots, 100 * alpha / 2)
ci_upper = np.percentile(att_boots, 100 * (1 - alpha / 2))
return se, ci_lower, ci_upper
# ============================================================================
# PSM (Propensity Score Matching) Estimator
# ============================================================================
[docs]
@dataclass
class PSMResult:
"""
Result container for propensity score matching ATT estimation.
Attributes
----------
att : float
ATT point estimate from nearest neighbor matching.
se : float
Standard error (heteroskedasticity-robust or bootstrap).
ci_lower : float
Lower bound of confidence interval.
ci_upper : float
Upper bound of confidence interval.
t_stat : float
t-statistic for testing ATT=0.
pvalue : float
Two-sided p-value.
propensity_scores : np.ndarray
Estimated propensity scores for all observations.
match_counts : np.ndarray
Number of matches for each treated unit.
matched_control_ids : list[list[int]]
List of matched control unit indices for each treated unit.
n_treated : int
Number of treated units.
n_control : int
Number of control units.
n_matched : int
Number of unique control units used in matching.
caliper : float or None
Caliper value used for matching, if any.
n_dropped : int
Number of treated units dropped due to caliper constraint.
"""
att: float
se: float
ci_lower: float
ci_upper: float
t_stat: float
pvalue: float
propensity_scores: np.ndarray
match_counts: np.ndarray
matched_control_ids: list[list[int]]
n_treated: int
n_control: int
n_matched: int
caliper: float | None
n_dropped: int
[docs]
def estimate_psm(
data: pd.DataFrame,
y: str,
d: str,
propensity_controls: list[str],
n_neighbors: int = 1,
with_replacement: bool = True,
caliper: float | None = None,
caliper_scale: str = 'sd',
trim_threshold: float = 0.01,
se_method: str = 'abadie_imbens',
n_bootstrap: int = 200,
seed: int | None = None,
alpha: float = 0.05,
match_order: str = 'data',
return_diagnostics: bool = False,
gvar_col: str | None = None,
ivar_col: str | None = None,
cohort_g: int | None = None,
period_r: int | None = None,
) -> PSMResult:
"""
Estimate ATT using propensity score matching.
Uses nearest-neighbor matching on estimated propensity scores to find
comparable control units for each treated unit. The ATT is estimated as
the average difference between treated outcomes and matched control outcomes.
The PSM-ATT estimator is:
.. math::
\\hat{\\tau} = \\frac{1}{N_1} \\sum_{D_i=1} \\left[ Y_i - \\frac{1}{k}
\\sum_{j \\in M(i)} Y_j \\right]
where M(i) is the set of k nearest-neighbor matches for unit i.
Parameters
----------
data : pd.DataFrame
Cross-sectional data with one row per unit.
y : str
Outcome variable column name (typically transformed outcome).
d : str
Treatment indicator column name (1=treated, 0=control).
propensity_controls : list[str]
Covariate column names for propensity score model.
n_neighbors : int, default=1
Number of control matches per treated unit (k).
with_replacement : bool, default=True
Whether to match with replacement.
caliper : float, optional
Maximum propensity score distance for valid matches.
caliper_scale : str, default='sd'
Scale for caliper: 'sd' (standard deviation units) or 'absolute'.
trim_threshold : float, default=0.01
Propensity score trimming threshold.
se_method : str, default='abadie_imbens'
Standard error method: 'abadie_imbens' uses heteroskedasticity-robust
variance estimation, 'bootstrap' uses nonparametric bootstrap.
n_bootstrap : int, default=200
Number of bootstrap replications when se_method='bootstrap'.
seed : int, optional
Random seed for bootstrap resampling.
alpha : float, default=0.05
Significance level for confidence intervals.
match_order : str, default='data'
Order in which to process treated units for matching.
return_diagnostics : bool, default=False
Whether to return additional diagnostic information.
gvar_col : str, optional
Cohort indicator column name (for staggered designs).
ivar_col : str, optional
Unit identifier column name (for staggered designs).
cohort_g : int, optional
Cohort value (for staggered designs).
period_r : int, optional
Period value (for staggered designs).
Returns
-------
PSMResult
Estimation results containing ATT estimate, standard error,
confidence interval, and matching diagnostics.
Raises
------
ValueError
If required columns are missing, sample sizes are insufficient,
or no valid matches can be found.
See Also
--------
estimate_ipwra : Doubly robust IPWRA estimator.
estimate_ipw : Pure IPW estimator.
"""
# Validate inputs.
_validate_psm_inputs(data, y, d, propensity_controls, n_neighbors)
# Remove observations with missing values.
all_vars = [y, d] + list(set(propensity_controls))
data_clean = data[all_vars].dropna().copy()
n = len(data_clean)
D = data_clean[d].values.astype(float)
Y = data_clean[y].values.astype(float)
n_treated = int(D.sum())
n_control = int((1 - D).sum())
if n_treated < 1:
raise ValueError("No treated units; cannot perform matching.")
if n_control < n_neighbors:
raise ValueError(
f"Control sample size ({n_control}) is less than number of neighbors "
f"({n_neighbors}); cannot perform matching."
)
if n_treated < 5:
warnings.warn(
f"Small treated sample (n_treated={n_treated}); PSM estimates may be unstable.",
UserWarning
)
# Estimate propensity scores.
pscores, _ = estimate_propensity_score(
data_clean, d, propensity_controls, trim_threshold
)
# Perform matching.
treat_indices = np.where(D == 1)[0]
control_indices = np.where(D == 0)[0]
pscores_treat = pscores[treat_indices]
pscores_control = pscores[control_indices]
# Compute caliper if specified.
actual_caliper = None
if caliper is not None:
if caliper_scale == 'sd':
ps_sd = np.std(pscores)
actual_caliper = caliper * ps_sd
else:
actual_caliper = caliper
# Execute nearest neighbor matching.
matched_control_ids, match_counts, n_dropped = _nearest_neighbor_match(
pscores_treat=pscores_treat,
pscores_control=pscores_control,
n_neighbors=n_neighbors,
with_replacement=with_replacement,
caliper=actual_caliper,
)
# Compute ATT.
Y_treat = Y[treat_indices]
Y_control = Y[control_indices]
# Filter out treated units dropped due to caliper.
valid_treat_mask = np.array([len(m) > 0 for m in matched_control_ids])
n_valid = valid_treat_mask.sum()
if n_valid == 0:
raise ValueError(
"All treated units failed to find valid matches. "
"Consider relaxing the caliper or checking propensity score overlap."
)
# 当匹配成功率低于50%时发出警告
match_success_rate = n_valid / n_treated
if match_success_rate < 0.5:
warnings.warn(
f"Low match success rate: {match_success_rate:.1%} "
f"({n_valid}/{n_treated} treated units matched). "
f"Results may be unreliable. Consider relaxing the caliper "
f"or checking propensity score overlap.",
UserWarning,
stacklevel=2,
)
# Compute matched mean for each treated unit.
att_individual = []
for i, matches in enumerate(matched_control_ids):
if len(matches) > 0:
# Matches contains indices within control group.
y_matched = Y_control[matches].mean()
att_i = Y_treat[i] - y_matched
att_individual.append(att_i)
att = np.mean(att_individual)
# Count unique matched control units.
all_matched = set()
for matches in matched_control_ids:
all_matched.update(matches)
n_matched = len(all_matched)
# Compute standard errors.
if se_method == 'abadie_imbens':
se, ci_lower, ci_upper = _compute_psm_se_abadie_imbens(
Y_treat=Y_treat,
Y_control=Y_control,
matched_control_ids=matched_control_ids,
att=att,
alpha=alpha,
)
elif se_method == 'bootstrap':
se, ci_lower, ci_upper = _compute_psm_se_bootstrap(
data=data_clean,
y=y,
d=d,
propensity_controls=propensity_controls,
n_neighbors=n_neighbors,
with_replacement=with_replacement,
caliper=caliper,
caliper_scale=caliper_scale,
trim_threshold=trim_threshold,
n_bootstrap=n_bootstrap,
seed=seed,
alpha=alpha,
)
else:
raise ValueError(
f"Unknown se_method: {se_method}. "
"Use 'abadie_imbens' or 'bootstrap'."
)
# Compute t-statistic and p-value.
if se > 0:
t_stat = att / se
pvalue = 2 * (1 - stats.norm.cdf(abs(t_stat)))
else:
t_stat = np.nan
pvalue = np.nan
return PSMResult(
att=att,
se=se,
ci_lower=ci_lower,
ci_upper=ci_upper,
t_stat=t_stat,
pvalue=pvalue,
propensity_scores=pscores,
match_counts=match_counts,
matched_control_ids=matched_control_ids,
n_treated=n_treated,
n_control=n_control,
n_matched=n_matched,
caliper=actual_caliper,
n_dropped=n_dropped,
)
def _validate_psm_inputs(
data: pd.DataFrame,
y: str,
d: str,
propensity_controls: list[str],
n_neighbors: int,
) -> None:
"""
Validate PSM input parameters.
Parameters
----------
data : pd.DataFrame
Dataset to validate.
y : str
Outcome variable column name.
d : str
Treatment indicator column name.
propensity_controls : list[str]
Propensity score control variable names.
n_neighbors : int
Number of nearest neighbors.
Raises
------
ValueError
If any validation check fails.
"""
if y not in data.columns:
raise ValueError(f"Outcome variable '{y}' not found in data.")
if d not in data.columns:
raise ValueError(f"Treatment indicator '{d}' not found in data.")
missing_controls = [c for c in propensity_controls if c not in data.columns]
if missing_controls:
raise ValueError(f"Propensity score controls not found: {missing_controls}")
if n_neighbors < 1:
raise ValueError(f"n_neighbors must be >= 1, got: {n_neighbors}")
# Check that treatment indicator is binary.
d_vals = data[d].dropna().unique()
if not set(d_vals).issubset({0, 1, 0.0, 1.0}):
raise ValueError(
f"Treatment indicator '{d}' must be binary (0/1), "
f"found unique values: {d_vals}"
)
def _nearest_neighbor_match(
pscores_treat: np.ndarray,
pscores_control: np.ndarray,
n_neighbors: int,
with_replacement: bool,
caliper: float | None,
) -> tuple[list[list[int]], np.ndarray, int]:
"""
Perform nearest neighbor propensity score matching.
For each treated unit, finds the k control units with the closest
propensity scores, subject to optional caliper constraints.
Parameters
----------
pscores_treat : np.ndarray
Propensity scores for treated units.
pscores_control : np.ndarray
Propensity scores for control units.
n_neighbors : int
Number of control units to match per treated unit.
with_replacement : bool
Whether control units can be matched to multiple treated units.
caliper : float or None
Maximum propensity score distance for valid matches.
Returns
-------
matched_control_ids : list[list[int]]
List of matched control unit indices for each treated unit.
match_counts : np.ndarray
Number of valid matches for each treated unit.
n_dropped : int
Number of treated units dropped due to caliper constraint.
"""
n_treat = len(pscores_treat)
n_control = len(pscores_control)
matched_control_ids: list[list[int]] = []
match_counts = np.zeros(n_treat, dtype=int)
n_dropped = 0
# Track used controls for matching without replacement.
used_controls: set | None = None if with_replacement else set()
for i in range(n_treat):
ps_i = pscores_treat[i]
# Compute distances to all control units.
distances = np.abs(pscores_control - ps_i)
# For matching without replacement, exclude already-used controls.
if not with_replacement and used_controls:
available_mask = np.array([
j not in used_controls for j in range(n_control)
])
if not available_mask.any():
# No available control units.
matched_control_ids.append([])
n_dropped += 1
continue
distances[~available_mask] = np.inf
# Apply caliper constraint.
if caliper is not None:
valid_mask = distances <= caliper
if not valid_mask.any():
# No valid matches within caliper.
matched_control_ids.append([])
n_dropped += 1
continue
# Find k nearest neighbors.
k = min(n_neighbors, n_control)
nearest_indices = np.argsort(distances)[:k]
# Verify caliper constraint for selected neighbors.
if caliper is not None:
nearest_indices = [
idx for idx in nearest_indices
if distances[idx] <= caliper
]
if len(nearest_indices) == 0:
matched_control_ids.append([])
n_dropped += 1
continue
# Record matches.
matched_control_ids.append(list(nearest_indices))
match_counts[i] = len(nearest_indices)
# Mark controls as used for matching without replacement.
if not with_replacement and used_controls is not None:
used_controls.update(nearest_indices)
return matched_control_ids, match_counts, n_dropped
def _compute_psm_se_abadie_imbens(
Y_treat: np.ndarray,
Y_control: np.ndarray,
matched_control_ids: list[list[int]],
att: float,
alpha: float = 0.05,
) -> tuple[float, float, float]:
"""
Compute heteroskedasticity-robust standard error for matching estimators.
Uses the variance of individual treatment effects to estimate the
standard error of the ATT, based on the asymptotic variance formula
that accounts for heterogeneous treatment effects across matched pairs.
Parameters
----------
Y_treat : np.ndarray
Outcome values for treated units.
Y_control : np.ndarray
Outcome values for control units.
matched_control_ids : list[list[int]]
Matched control indices for each treated unit.
att : float
Estimated ATT value.
alpha : float, default=0.05
Significance level for confidence interval.
Returns
-------
se : float
Standard error estimate.
ci_lower : float
Lower bound of confidence interval.
ci_upper : float
Upper bound of confidence interval.
"""
n_valid = sum(1 for m in matched_control_ids if len(m) > 0)
if n_valid < 2:
return np.nan, np.nan, np.nan
# Compute individual treatment effects for each matched pair.
individual_effects = []
for i, matches in enumerate(matched_control_ids):
if len(matches) > 0:
y_matched = Y_control[matches].mean()
effect_i = Y_treat[i] - y_matched
individual_effects.append(effect_i)
individual_effects = np.array(individual_effects)
# Use variance of individual effects for SE estimation.
var_effects = np.var(individual_effects, ddof=1)
var_att = var_effects / n_valid
se = np.sqrt(var_att)
# Confidence interval.
z_crit = stats.norm.ppf(1 - alpha / 2)
ci_lower = att - z_crit * se
ci_upper = att + z_crit * se
return se, ci_lower, ci_upper
def _compute_psm_se_bootstrap(
data: pd.DataFrame,
y: str,
d: str,
propensity_controls: list[str],
n_neighbors: int,
with_replacement: bool,
caliper: float | None,
caliper_scale: str,
trim_threshold: float,
n_bootstrap: int,
seed: int | None,
alpha: float,
) -> tuple[float, float, float]:
"""
Compute PSM standard error using nonparametric bootstrap.
Resamples the entire PSM procedure including propensity score estimation
and nearest-neighbor matching. This approach properly accounts for all
sources of estimation uncertainty.
Parameters
----------
data : pd.DataFrame
Dataset used for estimation.
y : str
Outcome variable column name.
d : str
Treatment indicator column name.
propensity_controls : list[str]
Propensity score control variable names.
n_neighbors : int
Number of nearest neighbors.
with_replacement : bool
Whether to match with replacement.
caliper : float or None
Caliper constraint value.
caliper_scale : str
Scale for caliper ('sd' or 'absolute').
trim_threshold : float
Propensity score trimming threshold.
n_bootstrap : int
Number of bootstrap replications.
seed : int or None
Random seed for reproducibility.
alpha : float
Significance level for confidence interval.
Returns
-------
se : float
Bootstrap standard error.
ci_lower : float
Percentile confidence interval lower bound.
ci_upper : float
Percentile confidence interval upper bound.
"""
if seed is not None:
np.random.seed(seed)
n = len(data)
att_boots = []
for _ in range(n_bootstrap):
# Resample with replacement.
indices = np.random.choice(n, size=n, replace=True)
data_boot = data.iloc[indices].reset_index(drop=True)
try:
# Estimate propensity scores.
pscores_boot, _ = estimate_propensity_score(
data_boot, d, propensity_controls, trim_threshold
)
D_boot = data_boot[d].values.astype(float)
Y_boot = data_boot[y].values.astype(float)
treat_indices = np.where(D_boot == 1)[0]
control_indices = np.where(D_boot == 0)[0]
if len(treat_indices) < 1 or len(control_indices) < n_neighbors:
continue
pscores_treat = pscores_boot[treat_indices]
pscores_control = pscores_boot[control_indices]
# Compute caliper.
actual_caliper = None
if caliper is not None:
if caliper_scale == 'sd':
ps_sd = np.std(pscores_boot)
actual_caliper = caliper * ps_sd
else:
actual_caliper = caliper
# Perform matching.
matched_ids, _, _ = _nearest_neighbor_match(
pscores_treat, pscores_control,
n_neighbors, with_replacement, actual_caliper
)
# Compute ATT.
Y_treat = Y_boot[treat_indices]
Y_control = Y_boot[control_indices]
att_individual = []
for i, matches in enumerate(matched_ids):
if len(matches) > 0:
y_matched = Y_control[matches].mean()
att_i = Y_treat[i] - y_matched
att_individual.append(att_i)
if len(att_individual) > 0:
att_boot = np.mean(att_individual)
att_boots.append(att_boot)
except (ValueError, np.linalg.LinAlgError, FloatingPointError, ZeroDivisionError):
# Skip failed bootstrap iterations due to numerical issues.
continue
if len(att_boots) < n_bootstrap * 0.5:
warnings.warn(
f"Low bootstrap success rate: {len(att_boots)}/{n_bootstrap}",
UserWarning
)
if len(att_boots) < 10:
raise ValueError(f"Insufficient bootstrap samples: {len(att_boots)}")
att_boots = np.array(att_boots)
se = np.std(att_boots, ddof=1)
ci_lower = np.percentile(att_boots, 100 * alpha / 2)
ci_upper = np.percentile(att_boots, 100 * (1 - alpha / 2))
return se, ci_lower, ci_upper