Source code for lwdid.results

"""
Result container for difference-in-differences estimation outputs.

This module provides the LWDIDResults class for encapsulating estimation
outputs from rolling transformation DiD methodology, supporting three scenarios:

1. **Small-sample common timing**: Results include exact t-based inference
   statistics under classical linear model assumptions.

2. **Large-sample common timing**: Results include asymptotic inference with
   heteroskedasticity-robust standard errors.

3. **Staggered adoption**: Results include cohort-time specific effects,
   cohort-level aggregations, and overall weighted effects with flexible
   control group strategies.

The class implements immutable core attributes via properties to ensure
result integrity, provides multiple summary formats (text, LaTeX, Excel,
CSV), supports event study visualization for staggered designs, and
includes period-specific effects for common timing designs.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

import pandas as pd

if TYPE_CHECKING:
    import matplotlib.axes
    from lwdid.staggered.parallel_trends import ParallelTrendsTestResult


[docs] class LWDIDResults: """ Container for difference-in-differences estimation results. Stores all estimation outputs from the lwdid() function implementing the rolling transformation methodology. Supports three scenarios: 1. **Small-sample common timing**: Exact t-based inference under classical linear model assumptions. 2. **Large-sample common timing**: Asymptotic inference with heteroskedasticity-robust standard errors. 3. **Staggered adoption**: Cohort-time specific effects with flexible control group strategies. All core attributes are read-only properties to ensure result integrity. Provides methods for displaying, visualizing, and exporting results. Attributes ---------- att : float Average treatment effect on the treated (ATT) point estimate. se_att : float Standard error of ATT. t_stat : float t-statistic for H0: ATT = 0. pvalue : float Two-sided p-value for t-test. ci_lower : float Lower bound of 95% confidence interval. ci_upper : float Upper bound of 95% confidence interval. nobs : int Number of observations in the regression. n_treated : int Number of treated units. n_control : int Number of control units. K : int Last pre-treatment period index. tpost1 : int First post-treatment period index. df_resid : int Residual degrees of freedom from the main regression. df_inference : int Degrees of freedom used for inference. For cluster-robust standard errors, this is G - 1 (number of clusters minus 1). For other variance estimators, this equals df_resid. rolling : str Transformation method used ('demean', 'detrend', 'demeanq', or 'detrendq'). vce_type : str Variance estimator type ('ols', 'robust', 'hc1', 'hc3', or 'cluster'). cluster_var : str or None Clustering variable name (if vce='cluster'). n_clusters : int or None Number of clusters (if vce='cluster'). controls_used : bool Whether control variables were included. controls : list List of control variable names used. params : array-like Full vector of regression coefficients. bse : array-like Standard errors of all coefficients. vcov : array-like Variance-covariance matrix. att_by_period : pd.DataFrame Period-specific ATT estimates with columns: period, tindex, beta, se, ci_lower, ci_upper, tstat, pval, N. ri_pvalue : float or None Randomization inference p-value (if ri=True was specified). ri_seed : int or None Random seed used for RI. rireps : int or None Number of RI permutations. ri_method : str or None Randomization inference method used ('bootstrap' or 'permutation'). Only available if ri=True was specified. ri_valid : int or None Number of valid (successful) RI replications. Only available if ri=True. ri_failed : int or None Number of failed RI replications. Only available if ri=True. data : pd.DataFrame Transformed data used for regression. Contains the rolling-transformed outcome variable and other regression variables. If the original ivar was string type, this DataFrame contains numeric IDs (1, 2, 3, ...). The mapping between original and numeric IDs is stored in data.attrs['id_mapping']. is_staggered : bool Whether this is a staggered DiD estimation. cohorts : list of int Sorted list of treatment cohorts (first treatment periods). Only available when is_staggered=True. cohort_sizes : dict Number of units in each cohort. Only available when is_staggered=True. att_by_cohort_time : pd.DataFrame or None (g,r)-specific ATT estimates with columns: cohort, period, event_time, att, se, ci_lower, ci_upper, t_stat, pvalue, n_treated, n_control. Only available when is_staggered=True. att_by_cohort : pd.DataFrame or None Cohort-specific ATT estimates (if aggregate='cohort' or 'overall'). Only available when is_staggered=True. att_overall : float or None Overall weighted ATT (if aggregate='overall'). Only available when is_staggered=True. se_overall : float or None Standard error of overall ATT. Only available when is_staggered=True. cohort_weights : dict Cohort weights used for overall effect (omega_g = N_g / N_treat). Only available when is_staggered=True. control_group : str or None User-specified control group strategy. Only available when is_staggered=True. control_group_used : str or None Actual control group strategy used (may differ due to auto-switching). Only available when is_staggered=True. aggregate : str or None Aggregation level ('none', 'cohort', 'overall'). Only available when is_staggered=True. estimator : str or None Estimation method ('ra', 'ipw', 'ipwra', 'psm'). Only available when is_staggered=True. Methods ------- summary() Returns formatted results summary string. plot(gid=None, graph_options=None) Generates plot of residualized outcomes (control vs. treated). plot_event_study(**kwargs) Generates event study diagram for staggered designs. to_excel(path) Exports results to Excel file with multiple sheets. to_csv(path) Exports period-specific effects to CSV. to_latex(path) Exports results to LaTeX table format. See Also -------- lwdid : Main estimation function that produces LWDIDResults objects. """
[docs] def __init__( self, results_dict: dict[str, Any], metadata: dict[str, Any], att_by_period: pd.DataFrame | None = None, cohort_time_effects: list | None = None, ): """ Initialize LWDIDResults container with estimation outputs. Parameters ---------- results_dict : dict Estimation results containing keys: 'att', 'se_att', 't_stat', 'pvalue', 'ci_lower', 'ci_upper', 'nobs', 'df_resid', 'params', 'bse', 'vcov', 'resid', 'vce_type'. metadata : dict Metadata containing keys: 'K', 'tpost1', 'depvar', 'N_treated', 'N_control'. att_by_period : pd.DataFrame, optional Period-specific effect estimates for common timing design. cohort_time_effects : list, optional List of CohortTimeEffect objects for staggered designs. """ self._att = results_dict['att'] self._se_att = results_dict['se_att'] self._t_stat = results_dict['t_stat'] self._pvalue = results_dict['pvalue'] self._ci_lower = results_dict['ci_lower'] self._ci_upper = results_dict['ci_upper'] self._K = metadata['K'] self._tpost1 = metadata['tpost1'] self._nobs = results_dict['nobs'] self._n_treated = results_dict.get('n_treated_sample', metadata['N_treated']) self._n_control = results_dict.get('n_control_sample', metadata['N_control']) self._df_resid = results_dict['df_resid'] self._df_inference = results_dict.get('df_inference', results_dict['df_resid']) self._cmd = 'lwdid' self._depvar = metadata['depvar'] self._rolling = metadata.get('rolling', 'demean') self._vce_type = results_dict['vce_type'] self._cluster_var = results_dict.get('cluster_var', None) self._n_clusters = results_dict.get('n_clusters', None) self._controls_used = results_dict.get('controls_used', False) self._controls = results_dict.get('controls', []) self._params = results_dict['params'] self._bse = results_dict['bse'] self._vcov = results_dict['vcov'] self._att_by_period = att_by_period self._cohort_time_effects = cohort_time_effects self._resid = results_dict['resid'] self._metadata = metadata self._ri_pvalue: float | None = None self._ri_seed: int | None = None self._rireps: int | None = None self._ri_method: str | None = None self._ri_valid: int | None = None self._ri_failed: int | None = None self._data: pd.DataFrame | None = None # === Pre-treatment dynamics attributes === self._att_pre_treatment: pd.DataFrame | None = results_dict.get('att_pre_treatment', None) self._parallel_trends_test: ParallelTrendsTestResult | None = results_dict.get('parallel_trends_test', None) self._include_pretreatment: bool = results_dict.get('include_pretreatment', False) # === Staggered-specific attributes === self._is_staggered: bool = results_dict.get('is_staggered', False) if self._is_staggered: # Cohort information self._cohorts = results_dict.get('cohorts', []) self._cohort_sizes = results_dict.get('cohort_sizes', {}) # Effect estimates self._att_by_cohort_time = results_dict.get('att_by_cohort_time', None) self._att_by_cohort = results_dict.get('att_by_cohort', None) self._att_overall = results_dict.get('att_overall', None) self._se_overall = results_dict.get('se_overall', None) self._ci_overall_lower = results_dict.get('ci_overall_lower', None) self._ci_overall_upper = results_dict.get('ci_overall_upper', None) self._t_stat_overall = results_dict.get('t_stat_overall', None) self._pvalue_overall = results_dict.get('pvalue_overall', None) # Cohort weights self._cohort_weights = results_dict.get('cohort_weights', {}) # Configuration self._control_group = results_dict.get('control_group', 'not_yet_treated') self._control_group_used = results_dict.get('control_group_used', 'not_yet_treated') self._aggregate = results_dict.get('aggregate', 'cohort') self._estimator = results_dict.get('estimator', 'ra') # n_never_treated attribute self._n_never_treated = results_dict.get('n_never_treated', 0) else: # Set defaults for non-staggered case self._cohorts = [] self._cohort_sizes = {} self._att_by_cohort_time = None self._att_by_cohort = None self._att_overall = None self._se_overall = None self._ci_overall_lower = None self._ci_overall_upper = None self._t_stat_overall = None self._pvalue_overall = None self._cohort_weights = {} self._control_group = None self._control_group_used = None self._aggregate = None self._estimator = None self._n_never_treated = None
@property def att(self) -> float: """ATT point estimate.""" return self._att @property def se_att(self) -> float: """Standard error of ATT.""" return self._se_att @property def t_stat(self) -> float: """t-statistic for ATT.""" return self._t_stat @property def pvalue(self) -> float: """Two-sided p-value.""" return self._pvalue @property def ci_lower(self) -> float: """95% CI lower bound.""" return self._ci_lower @property def ci_upper(self) -> float: """95% CI upper bound.""" return self._ci_upper @property def nobs(self) -> int: """Number of observations.""" return self._nobs @property def n_treated(self) -> int: """Number of treated units.""" return self._n_treated @property def n_control(self) -> int: """Number of control units.""" return self._n_control @property def df_resid(self) -> int: """Residual degrees of freedom.""" return self._df_resid @property def df_inference(self) -> int: """Degrees of freedom for inference.""" return self._df_inference @property def K(self) -> int: """Last pre-treatment period index.""" return self._K @property def tpost1(self) -> int: """First post-treatment period index.""" return self._tpost1 @property def cmd(self) -> str: """Command name.""" return self._cmd @property def depvar(self) -> str: """Dependent variable name.""" return self._depvar @property def rolling(self) -> str: """Transformation method used.""" return self._rolling @property def vce_type(self) -> str | None: """Variance estimator type.""" return self._vce_type @property def cluster_var(self) -> str | None: """Clustering variable name.""" return self._cluster_var @property def n_clusters(self) -> int | None: """Number of clusters.""" return self._n_clusters @property def controls_used(self) -> bool: """Whether control variables were included.""" return self._controls_used @property def controls(self) -> list: """List of control variable names.""" return list(self._controls) @property def params(self): """Full coefficient vector.""" return self._params @property def bse(self): """Standard errors of coefficients.""" return self._bse @property def vcov(self): """Variance-covariance matrix.""" return self._vcov @property def att_by_period(self) -> pd.DataFrame | None: """Period-specific ATT estimates (returns copy).""" if self._att_by_period is None: return None return self._att_by_period.copy() @property def ri_pvalue(self) -> float | None: """Randomization inference p-value.""" return self._ri_pvalue @ri_pvalue.setter def ri_pvalue(self, value: float | None) -> None: self._ri_pvalue = value @property def ri_seed(self) -> int | None: """Random seed used for RI.""" return self._ri_seed @ri_seed.setter def ri_seed(self, value: int | None) -> None: self._ri_seed = value @property def rireps(self) -> int | None: """Number of RI replications.""" return self._rireps @rireps.setter def rireps(self, value: int | None) -> None: self._rireps = value @property def ri_method(self) -> str | None: """Randomization inference method.""" return self._ri_method @ri_method.setter def ri_method(self, value: str | None) -> None: self._ri_method = value @property def ri_valid(self) -> int | None: """Number of valid RI replications.""" return self._ri_valid @ri_valid.setter def ri_valid(self, value: int | None) -> None: self._ri_valid = value @property def ri_failed(self) -> int | None: """Number of failed RI replications.""" return self._ri_failed @ri_failed.setter def ri_failed(self, value: int | None) -> None: self._ri_failed = value @property def data(self) -> pd.DataFrame | None: """Transformed data used for regression.""" return self._data @data.setter def data(self, value: pd.DataFrame | None) -> None: self._data = value # === Staggered-specific Properties === @property def is_staggered(self) -> bool: """Whether this is a staggered DiD estimation.""" return self._is_staggered @property def cohorts(self) -> list: """List of treatment cohorts.""" return list(self._cohorts) @property def cohort_sizes(self) -> dict: """Number of units in each cohort.""" return dict(self._cohort_sizes) @property def att_by_cohort_time(self) -> pd.DataFrame | None: """Cohort-time specific ATT estimates (returns copy).""" if self._att_by_cohort_time is None: return None return self._att_by_cohort_time.copy() @property def att_by_cohort(self) -> pd.DataFrame | None: """Cohort-specific ATT estimates (returns copy).""" if self._att_by_cohort is None: return None return self._att_by_cohort.copy() @property def att_overall(self) -> float | None: """Overall weighted ATT estimate.""" return self._att_overall @property def se_overall(self) -> float | None: """Standard error of overall ATT.""" return self._se_overall @property def ci_overall_lower(self) -> float | None: """95% CI lower bound for overall ATT.""" return self._ci_overall_lower @property def ci_overall_upper(self) -> float | None: """95% CI upper bound for overall ATT.""" return self._ci_overall_upper @property def t_stat_overall(self) -> float | None: """t-statistic for overall ATT.""" return self._t_stat_overall @property def pvalue_overall(self) -> float | None: """p-value for overall ATT.""" return self._pvalue_overall @property def cohort_weights(self) -> dict: """Cohort weights for overall effect.""" return dict(self._cohort_weights) @property def control_group(self) -> str | None: """User-specified control group strategy.""" return self._control_group @property def control_group_used(self) -> str | None: """Actual control group strategy used.""" return self._control_group_used @property def aggregate(self) -> str | None: """Aggregation level.""" return self._aggregate @property def estimator(self) -> str | None: """Estimation method.""" return self._estimator @property def n_never_treated(self) -> int | None: """Number of never-treated units.""" return self._n_never_treated # === Pre-treatment Dynamics Properties === @property def att_pre_treatment(self) -> pd.DataFrame | None: """ Pre-treatment ATT estimates (returns copy). DataFrame with columns: cohort, period, event_time, att, se, ci_lower, ci_upper, t_stat, pvalue, n_treated, n_control, is_anchor, rolling_window_size. Only available when include_pretreatment=True was specified during estimation. """ if self._att_pre_treatment is None: return None return self._att_pre_treatment.copy() @att_pre_treatment.setter def att_pre_treatment(self, value: pd.DataFrame | None) -> None: self._att_pre_treatment = value @property def parallel_trends_test(self) -> ParallelTrendsTestResult | None: """ Parallel trends test results. Contains individual t-tests for each pre-treatment period and joint F-test for H0: all pre-treatment ATT = 0. Only available when include_pretreatment=True and pretreatment_test=True were specified during estimation. """ return self._parallel_trends_test @parallel_trends_test.setter def parallel_trends_test(self, value: ParallelTrendsTestResult | None) -> None: self._parallel_trends_test = value @property def include_pretreatment(self) -> bool: """Whether pre-treatment dynamics were computed.""" return self._include_pretreatment
[docs] def summary(self) -> str: """ Generate a formatted summary of estimation results. For staggered designs, dispatches to summary_staggered(). For common timing designs, displays ATT estimate, standard error, t-statistic, p-value, confidence interval, and period-specific effects if available. Returns ------- str Formatted results summary string suitable for console output. """ if self.is_staggered: return self.summary_staggered() sep_line = "=" * 80 sub_line = "-" * 80 output = [] output.append(sep_line) output.append(" lwdid Results") output.append(sep_line) output.append(f"Transformation: {self.rolling}") vce_desc = { 'ols': 'OLS (Homoskedastic)', 'robust': 'HC1 (Heteroskedasticity-robust)', 'hc1': 'HC1 (Heteroskedasticity-robust)', 'hc3': 'HC3 (Small-sample adjusted)', 'cluster': f'Cluster-robust (clustered by {self.cluster_var})' if self.cluster_var else 'Cluster-robust' } vce_display = vce_desc.get(self.vce_type, self.vce_type) output.append(f"Variance Type: {vce_display}") if self.vce_type == 'cluster' and self.n_clusters is not None: output.append(f"Number of clusters: {self.n_clusters}") output.append(f"Dependent Variable: {self.depvar}") output.append("") output.append(f"Number of observations: {self.nobs}") output.append(f"Number of treated units: {self.n_treated}") output.append(f"Number of control units: {self.n_control}") output.append(f"Pre-treatment periods: {self.K} (K={self.K})") output.append(f"Post-treatment periods: {self.tpost1} to end (tpost1={self.tpost1})") output.append("") output.append(sub_line) output.append("Average Treatment Effect on the Treated") output.append(sub_line) output.append(f"ATT: {self.att:>10.4f}") output.append(f"Std. Err.: {self.se_att:>10.4f} ({self.vce_type})") output.append(f"t-stat: {self.t_stat:>10.2f}") output.append(f"P>|t|: {self.pvalue:>10.3f}") output.append(f"df: {self.df_inference:>10}") output.append(f"[95% Conf. Interval]: {self.ci_lower:>8.4f} {self.ci_upper:>8.4f}") if self.ri_pvalue is not None: output.append("") output.append("Randomization Inference:") method_str = f"method={self.ri_method}" if self.ri_method else "" valid_str = f", valid={self.ri_valid}/{self.rireps}" if self.ri_valid is not None else "" output.append(f"RI P-value: {self.ri_pvalue:>10.3f} ({method_str}, seed={self.ri_seed}{valid_str})") output.append(sep_line) if self.att_by_period is not None: output.append("") output.append("=== Period-by-period post-treatment effects ===") output.append(self.att_by_period.head(5).to_string(index=False)) if len(self.att_by_period) > 5: output.append(f"... ({len(self.att_by_period) - 5} more periods)") output.append("") output.append("Use results.att_by_period to view all period-specific estimates") return "\n".join(output)
[docs] def summary_staggered(self) -> str: """ Generate a formatted summary for staggered DiD estimation results. Displays treatment cohorts, sample sizes, control group strategy, overall weighted effect (if aggregate='overall'), and cohort-specific effects (if aggregate='cohort' or 'overall'). Returns ------- str Formatted results summary string suitable for console output. Raises ------ ValueError If called on non-staggered estimation results. Notes ----- The summary output varies by aggregation level. When aggregate='none', only (g,r)-specific effects are available. When aggregate='cohort', cohort-specific effects are shown. When aggregate='overall', both cohort-specific and overall weighted effects are displayed. If the control group strategy was automatically switched from the user-specified value, a notification is included. """ if not self.is_staggered: raise ValueError( "summary_staggered() requires staggered DiD results. " "Use summary() for common timing results." ) sep_line = "=" * 70 sub_line = "-" * 70 output = [] output.append(sep_line) output.append("LWDID Staggered DiD Results") output.append(sep_line) # === Basic information === output.append(f"Treatment Cohorts: {', '.join(map(str, self.cohorts))}") output.append(f"Number of Treated Units: {self.n_treated}") output.append(f"Number of Control Units: {self.n_control}") if self.n_never_treated is not None: output.append(f"Number of Never Treated Units: {self.n_never_treated}") output.append(f"Control Group Strategy: {self.control_group_used}") # If control group was auto-switched, show notification if self.control_group != self.control_group_used: output.append( f" Note: Auto-switched from '{self.control_group}' " f"for {self.aggregate} effect estimation" ) output.append(f"Transformation: {self.rolling}") output.append(f"Estimator: {self.estimator}") output.append(f"Aggregation: {self.aggregate}") output.append(sub_line) # === Overall effect (shown when aggregate='overall') === if self.att_overall is not None: output.append("") output.append("Overall Weighted Effect (τ_ω):") output.append(f" ATT_ω = {self.att_overall:.4f}") output.append(f" SE = {self.se_overall:.4f}") if self.t_stat_overall is not None: output.append(f" t-stat = {self.t_stat_overall:.3f}") if self.pvalue_overall is not None: output.append(f" P>|t| = {self.pvalue_overall:.3f}") if self.ci_overall_lower is not None and self.ci_overall_upper is not None: output.append( f" 95% CI: [{self.ci_overall_lower:.4f}, {self.ci_overall_upper:.4f}]" ) output.append("") # Cohort weights if self.cohort_weights: output.append("Cohort Weights:") for g in sorted(self.cohort_weights.keys()): w = self.cohort_weights[g] n = self.cohort_sizes.get(g, '?') output.append(f" Cohort {g}: ω = {w:.3f} (N = {n})") output.append(sub_line) # === Cohort effects (shown when aggregate='cohort' or 'overall') === if self.att_by_cohort is not None and not self.att_by_cohort.empty: output.append("") output.append("Cohort-Specific Effects (τ_g):") # Header header = ( f" {'Cohort':>6} {'ATT':>8} {'SE':>8} {'t-stat':>7} " f"{'P>|t|':>6} {'[95% CI]':>18} {'N_units':>7} {'N_periods':>9}" ) output.append(header) for _, row in self.att_by_cohort.iterrows(): cohort = int(row['cohort']) att = row['att'] se = row['se'] # Use column check for optional fields ci_l = ( row['ci_lower'] if 'ci_lower' in row.index else (att - 1.96 * se) ) ci_u = ( row['ci_upper'] if 'ci_upper' in row.index else (att + 1.96 * se) ) t_stat = ( row['t_stat'] if 't_stat' in row.index else (att / se if se > 0 else float('nan')) ) pval = row['pvalue'] if 'pvalue' in row.index else float('nan') n_units = row['n_units'] if 'n_units' in row.index else '?' n_periods = row['n_periods'] if 'n_periods' in row.index else '?' ci_str = f"[{ci_l:>6.3f}, {ci_u:>6.3f}]" line = ( f" {cohort:>6} {att:>8.4f} {se:>8.4f} {t_stat:>7.2f} " f"{pval:>6.3f} {ci_str:>18} {n_units:>7} {n_periods:>9}" ) output.append(line) output.append(sub_line) # === Hint information === output.append("") if self.att_by_cohort_time is not None: output.append("Use results.att_by_cohort_time for (g,r)-specific effects") output.append("Use results.plot_event_study() for Event Study visualization") # === Pre-treatment Dynamics (shown when include_pretreatment=True) === if self.include_pretreatment and self.att_pre_treatment is not None: output.append(sub_line) output.append("") output.append("Pre-treatment Dynamics") output.append("-" * 40) # Parallel trends test results if self.parallel_trends_test is not None: pt = self.parallel_trends_test output.append("") output.append("Parallel Trends Test (H0: all pre-treatment ATT = 0):") output.append(f" F-statistic: {pt.joint_f_stat:.4f}") output.append(f" P-value: {pt.joint_pvalue:.4f}") output.append(f" DF (num, den): ({pt.joint_df1}, {pt.joint_df2})") output.append(f" Reject H0: {'Yes' if pt.reject_null else 'No'} (α={pt.alpha:.2f})") output.append("") # Pre-treatment ATT summary pre_df = self.att_pre_treatment non_anchor = pre_df[~pre_df['is_anchor']] if len(non_anchor) > 0: output.append("Pre-treatment ATT Estimates:") header = ( f" {'e':>4} {'ATT':>10} {'SE':>8} {'t-stat':>7} " f"{'P>|t|':>6} {'[95% CI]':>20} {'Anchor':>6}" ) output.append(header) # Sort by event_time descending (anchor first, then earlier periods) for _, row in pre_df.sort_values('event_time', ascending=False).iterrows(): e = int(row['event_time']) att = row['att'] se = row['se'] t_stat = row['t_stat'] pval = row['pvalue'] ci_l = row['ci_lower'] ci_u = row['ci_upper'] is_anchor = row['is_anchor'] anchor_str = " *" if is_anchor else "" if is_anchor: # Anchor point: show as reference ci_str = "[ 0.0000, 0.0000]" line = ( f" {e:>4} {att:>10.4f} {se:>8.4f} {'---':>7} " f"{'---':>6} {ci_str:>20}{anchor_str}" ) else: ci_str = f"[{ci_l:>8.4f}, {ci_u:>8.4f}]" line = ( f" {e:>4} {att:>10.4f} {se:>8.4f} {t_stat:>7.2f} " f"{pval:>6.3f} {ci_str:>20}{anchor_str}" ) output.append(line) output.append("") output.append(" * Anchor point (e=-1): ATT=0 by construction") output.append("") output.append("Use results.att_pre_treatment for full pre-treatment effects DataFrame") output.append("Use results.plot_event_study(include_pre_treatment=True) for visualization") output.append(sep_line) return "\n".join(output)
[docs] def __repr__(self) -> str: """Return a concise string representation of the results object.""" if self.is_staggered: pre_info = ", pre_treatment=True" if self.include_pretreatment else "" if self.att_overall is not None: return ( f"LWDIDResults(staggered=True, att_overall={self.att_overall:.4f}, " f"se={self.se_overall:.4f}, cohorts={len(self.cohorts)}, " f"N_treated={self.n_treated}, N_control={self.n_control}{pre_info})" ) else: return ( f"LWDIDResults(staggered=True, cohorts={len(self.cohorts)}, " f"aggregate='{self.aggregate}', N_treated={self.n_treated}{pre_info})" ) else: return ( f"LWDIDResults(att={self.att:.4f}, se={self.se_att:.4f}, " f"method='{self.rolling}', N={self.nobs})" )
[docs] def __str__(self) -> str: """Return the formatted summary as the string representation.""" return self.summary()
@property def metadata(self) -> dict[str, Any]: """Internal metadata dictionary (returns copy).""" return dict(self._metadata)
[docs] def plot(self, gid: str | int | None = None, graph_options: dict | None = None): """ Generate a plot of residualized outcomes for treated and control groups. Creates a time series plot comparing the average residualized outcomes between treated and control units across all time periods. A vertical line indicates the treatment start period. Parameters ---------- gid : str or int, optional Specific unit ID to highlight. If provided, plots the individual unit trajectory along with group averages. graph_options : dict, optional Matplotlib customization options including 'title', 'xlabel', 'ylabel', 'figsize', 'colors', and other styling parameters. Returns ------- matplotlib.figure.Figure The generated matplotlib figure object. Raises ------ ValueError If results.data is not set (plotting requires transformed data). See Also -------- plot_event_study : Event study visualization for staggered designs. """ from .visualization import prepare_plot_data, plot_results if self.data is None: raise ValueError("results.data is not set; plotting requires transformed data") if isinstance(self.data.attrs, dict) and 'id_mapping' not in self.data.attrs: self.data.attrs['id_mapping'] = self._metadata.get('id_mapping') tindex = 'tindex' ivar = self._metadata['ivar'] tvar = self._metadata['tvar'] if isinstance(tvar, str): period_labels = { int(t): str(int(year)) for t, year in self.data.groupby(tindex)[tvar].first().items() } else: year_var, quarter_var = tvar[0], tvar[1] period_labels = {} for t in self.data[tindex].unique(): row = self.data[self.data[tindex] == t].iloc[0] year_val = int(row[year_var]) quarter_val = int(row[quarter_var]) period_labels[int(t)] = f"{year_val}q{quarter_val}" Tmax = int(self.data[tindex].max()) tpost1 = int(self._metadata['tpost1']) plot_data = prepare_plot_data( data=self.data, ydot_var='ydot', d_var='d_', tindex_var='tindex', ivar_var=ivar, gid=gid, tpost1=tpost1, Tmax=Tmax, period_labels=period_labels, ) fig = plot_results(plot_data, graph_options=graph_options) return fig
[docs] def plot_event_study( self, ref_period: int | None = 0, show_ci: bool = True, aggregation: str = 'mean', include_pre_treatment: bool = True, alpha: float = 0.05, df_strategy: str = 'conservative', title: str | None = None, xlabel: str | None = None, ylabel: str | None = None, figsize: tuple = (10, 6), savefig: str | None = None, dpi: int = 150, ax: matplotlib.axes.Axes | None = None, return_data: bool = False, **kwargs ): """ Generate an event study diagram for staggered DiD results. Aggregates cohort-time specific effects by event time (e = r - g) and visualizes dynamic treatment effects relative to a reference period. Parameters ---------- ref_period : int or None, optional Reference period for normalization (event time). Default is 0 (first treatment period). If None, no normalization is performed. show_ci : bool, optional Whether to display confidence interval shading. Default True. aggregation : {'mean', 'weighted'}, optional Cross-cohort aggregation method. 'mean' computes simple average with SE = sqrt(sum(se^2))/n. 'weighted' uses cohort weights with SE = sqrt(sum(w^2 * se^2)). Default 'mean'. include_pre_treatment : bool, optional Whether to include pre-treatment periods (e < 0). Default True. alpha : float, optional Significance level for confidence intervals. Default 0.05 (95% CI). df_strategy : {'conservative', 'weighted', 'fallback'}, optional Strategy for selecting degrees of freedom for t-distribution: - 'conservative': min(df_g) across cohorts (default) - 'weighted': weighted average of df_g - 'fallback': n_cohorts - 1 title : str, optional Plot title. Default 'Event Study: Dynamic Treatment Effects'. xlabel : str, optional X-axis label. Default 'Event Time (Periods Since Treatment)'. ylabel : str, optional Y-axis label. Default 'Treatment Effect'. figsize : tuple of int, optional Figure size in inches (width, height). Default (10, 6). savefig : str, optional File path to save the figure. If provided, saves automatically. dpi : int, optional Resolution for saved figure. Default 150. ax : matplotlib.axes.Axes, optional Existing axes object to plot on. If None, creates new figure. return_data : bool, optional If True, also returns the aggregated event study DataFrame. Default False. **kwargs Additional keyword arguments passed to matplotlib plotting functions. Returns ------- fig : matplotlib.figure.Figure The matplotlib figure object. ax : matplotlib.axes.Axes The matplotlib axes object. event_df : pd.DataFrame Aggregated event study data. Only returned if return_data=True. Raises ------ ValueError If called on non-staggered results or if att_by_cohort_time is empty or None. Notes ----- Confidence intervals use t-distribution rather than normal distribution for proper small-sample inference. The degrees of freedom are selected based on the df_strategy parameter. See Also -------- plot : Residualized outcomes plot for common timing designs. """ if not self.is_staggered: raise ValueError("Event study plot requires staggered DiD results") if self.att_by_cohort_time is None or self.att_by_cohort_time.empty: raise ValueError("att_by_cohort_time is empty; cannot create event study plot") import matplotlib.pyplot as plt import numpy as np import warnings # pandas version compatibility: include_groups only available in pandas 2.0+ _pandas_version = tuple(int(x) for x in pd.__version__.split('.')[:2]) _groupby_apply_kwargs = {'include_groups': False} if _pandas_version >= (2, 0) else {} # Aggregate by event_time df = self.att_by_cohort_time.copy() # Auto-compute event_time column if not present if 'event_time' not in df.columns: df['event_time'] = df['period'] - df['cohort'] # Add source column to distinguish post-treatment effects df['_source'] = 'post_treatment' # Merge pre-treatment effects if available and requested has_pre_treatment_data = ( include_pre_treatment and self.include_pretreatment and self.att_pre_treatment is not None and len(self.att_pre_treatment) > 0 ) if has_pre_treatment_data: pre_df = self.att_pre_treatment.copy() pre_df['_source'] = 'pre_treatment' # Ensure consistent columns for merging common_cols = ['cohort', 'period', 'event_time', 'att', 'se', 'ci_lower', 'ci_upper', 't_stat', 'pvalue', 'n_treated', 'n_control', '_source'] # Add is_anchor column if not present in post-treatment if 'is_anchor' not in df.columns: df['is_anchor'] = False if 'is_anchor' in pre_df.columns: common_cols.append('is_anchor') # Select only common columns that exist in both DataFrames df_cols = [c for c in common_cols if c in df.columns] pre_cols = [c for c in common_cols if c in pre_df.columns] # Combine pre and post treatment effects df = pd.concat([ df[df_cols], pre_df[pre_cols] ], ignore_index=True) # Filter pre-treatment if needed (only from post-treatment data) if not include_pre_treatment: df = df[df['event_time'] >= 0] if aggregation == 'weighted' and self.cohort_sizes: # Weighted aggregation using aggregate_to_event_time for proper t-distribution CI from lwdid.staggered.aggregation import aggregate_to_event_time, event_time_effects_to_dataframe # Use actual cohort_sizes for proper weight computation cohort_sizes = self.cohort_sizes try: watt_effects = aggregate_to_event_time( cohort_time_effects=df, cohort_sizes=cohort_sizes, alpha=alpha, df_strategy=df_strategy, verbose=False, ) event_df = event_time_effects_to_dataframe(watt_effects) except (ValueError, KeyError) as e: # Fallback to simple weighted aggregation if aggregate_to_event_time fails warnings.warn( f"aggregate_to_event_time failed ({e}), using fallback weighted aggregation", UserWarning ) df['weight'] = df['cohort'].map(self.cohort_weights).fillna(0) def weighted_agg(x): if x['weight'].sum() > 0: att = np.average(x['att'], weights=x['weight']) weights_norm = x['weight'] / x['weight'].sum() se = np.sqrt(np.sum((weights_norm ** 2) * (x['se'] ** 2))) else: att = x['att'].mean() se = np.sqrt((x['se'] ** 2).mean()) return pd.Series({'att': att, 'se': se, 'n_cohorts': len(x)}) event_df = df.groupby('event_time').apply( weighted_agg, **_groupby_apply_kwargs ).reset_index() # Use t-distribution for CI with fallback df from scipy.stats import t as t_dist df_inference = max(1, len(event_df) - 1) t_crit = t_dist.ppf(1 - alpha / 2, df_inference) event_df['ci_lower'] = event_df['att'] - t_crit * event_df['se'] event_df['ci_upper'] = event_df['att'] + t_crit * event_df['se'] event_df['df_inference'] = df_inference else: # Simple average aggregation (analytical SE assumes independence across cohorts) from scipy.stats import t as t_dist # Warn about independence assumption for analytical SE n_cohorts = len(df['cohort'].unique()) if 'cohort' in df.columns else 0 if n_cohorts > 1: warnings.warn( f"Analytical SE assumes independence across {n_cohorts} cohorts. " f"When cohorts share control units, this may underestimate SE " f"(confidence intervals may be too narrow). " f"Consider using se_method='bootstrap' for more accurate SE.", UserWarning, stacklevel=2 ) def simple_agg(x): att = x['att'].mean() # SE aggregation: for independent estimates, variance of mean = Var(ΣX/n) = ΣVar(X)/n² # Therefore SE = √(Σse²) / n n = len(x) se = np.sqrt((x['se'] ** 2).sum()) / n # Get df_inference: use min across cohorts (conservative) if 'df_inference' in x.columns: valid_dfs = x['df_inference'].dropna() df_inf = int(valid_dfs.min()) if len(valid_dfs) > 0 else max(1, n - 1) else: df_inf = max(1, n - 1) return pd.Series({'att': att, 'se': se, 'n_cohorts': n, 'df_inference': df_inf}) event_df = df.groupby('event_time').apply( simple_agg, **_groupby_apply_kwargs ).reset_index() # Calculate CI using t-distribution (NOT fixed z=1.96) event_df['ci_lower'] = event_df.apply( lambda row: row['att'] - t_dist.ppf(1 - alpha / 2, row['df_inference']) * row['se'], axis=1 ) event_df['ci_upper'] = event_df.apply( lambda row: row['att'] + t_dist.ppf(1 - alpha / 2, row['df_inference']) * row['se'], axis=1 ) event_df = event_df.sort_values('event_time') # Normalize to reference period if ref_period is not None: ref_row = event_df[event_df['event_time'] == ref_period] if len(ref_row) > 0: ref_att = ref_row['att'].values[0] event_df['att'] = event_df['att'] - ref_att event_df['ci_lower'] = event_df['ci_lower'] - ref_att event_df['ci_upper'] = event_df['ci_upper'] - ref_att else: warnings.warn( f"Reference period e={ref_period} not found in data. " f"Available event times: {sorted(event_df['event_time'].unique())}. " f"Skipping normalization.", UserWarning ) # Create plot if ax is None: fig, ax = plt.subplots(figsize=figsize) else: fig = ax.get_figure() # Get colors from kwargs or use defaults pre_treatment_color = kwargs.get('pre_treatment_color', 'gray') post_treatment_color = kwargs.get('post_treatment_color', 'blue') anchor_line = kwargs.get('anchor_line', True) # Reference lines ax.axhline(y=0, color='gray', linestyle='--', alpha=0.5, linewidth=1) ax.axvline(x=-0.5, color='red', linestyle='--', alpha=0.7, linewidth=1.5, label='Treatment Start') # Add anchor point line at e=-1 if pre-treatment data is shown if has_pre_treatment_data and anchor_line: ax.axvline(x=-1, color='darkgray', linestyle=':', alpha=0.5, linewidth=1, label='Anchor Point (e=-1)') # Separate pre and post treatment for different styling pre_mask = event_df['event_time'] < 0 post_mask = event_df['event_time'] >= 0 pre_events = event_df[pre_mask].copy() post_events = event_df[post_mask].copy() # Confidence interval shading if show_ci: ci_level = int((1 - alpha) * 100) # Post-treatment CI (blue) if len(post_events) > 0: ax.fill_between( post_events['event_time'], post_events['ci_lower'], post_events['ci_upper'], alpha=0.2, color=post_treatment_color, label=f'{ci_level}% CI (Post)' ) # Pre-treatment CI (gray) - only if we have pre-treatment data if len(pre_events) > 0 and has_pre_treatment_data: ax.fill_between( pre_events['event_time'], pre_events['ci_lower'], pre_events['ci_upper'], alpha=0.15, color=pre_treatment_color, label=f'{ci_level}% CI (Pre)' ) # Point estimates - post-treatment (blue) if len(post_events) > 0: ax.scatter(post_events['event_time'], post_events['att'], color=post_treatment_color, s=60, zorder=5, label='Post-treatment') ax.plot(post_events['event_time'], post_events['att'], color=post_treatment_color, alpha=0.7, linewidth=1.5) # Point estimates - pre-treatment (gray) if len(pre_events) > 0: # Mark anchor point differently anchor_mask = pre_events.get('is_anchor', pd.Series([False] * len(pre_events))) if anchor_mask.any(): anchor_events = pre_events[anchor_mask] non_anchor_events = pre_events[~anchor_mask] # Non-anchor pre-treatment points if len(non_anchor_events) > 0: ax.scatter(non_anchor_events['event_time'], non_anchor_events['att'], color=pre_treatment_color, s=60, zorder=5, marker='o', label='Pre-treatment') ax.plot(non_anchor_events['event_time'], non_anchor_events['att'], color=pre_treatment_color, alpha=0.7, linewidth=1.5, linestyle='--') # Anchor point (diamond marker) if len(anchor_events) > 0: ax.scatter(anchor_events['event_time'], anchor_events['att'], color=pre_treatment_color, s=100, zorder=6, marker='D', edgecolors='black', linewidths=1, label='Anchor (e=-1)') else: # No anchor info, plot all pre-treatment the same ax.scatter(pre_events['event_time'], pre_events['att'], color=pre_treatment_color, s=60, zorder=5, marker='o', label='Pre-treatment') ax.plot(pre_events['event_time'], pre_events['att'], color=pre_treatment_color, alpha=0.7, linewidth=1.5, linestyle='--') # Connect pre and post treatment with a line if both exist if len(pre_events) > 0 and len(post_events) > 0: # Get the last pre-treatment and first post-treatment points last_pre = pre_events.loc[pre_events['event_time'].idxmax()] first_post = post_events.loc[post_events['event_time'].idxmin()] ax.plot([last_pre['event_time'], first_post['event_time']], [last_pre['att'], first_post['att']], color='gray', alpha=0.4, linewidth=1, linestyle=':') # Labels and title ax.set_xlabel(xlabel or 'Event Time (Periods Since Treatment)', fontsize=11) ax.set_ylabel(ylabel or 'Treatment Effect', fontsize=11) ax.set_title(title or 'Event Study: Dynamic Treatment Effects', fontsize=13, fontweight='bold') # X-axis ticks as integers ax.set_xticks(sorted(event_df['event_time'].astype(int).unique())) # Legend ax.legend(loc='best', framealpha=0.9) # Grid ax.grid(True, alpha=0.3, linestyle=':') plt.tight_layout() # Save if savefig: plt.savefig(savefig, dpi=dpi, bbox_inches='tight') if return_data: return fig, ax, event_df return fig, ax
[docs] def to_excel(self, path: str): """ Export estimation results to an Excel file. For common timing designs, creates a workbook with Summary sheet containing ATT, SE, t-statistic, p-value, CI bounds, and sample sizes. If period-specific effects are available, includes a ByPeriod sheet. For staggered designs, dispatches to to_excel_staggered(). Parameters ---------- path : str File path for the Excel output (.xlsx extension recommended). See Also -------- to_csv : Export period-specific effects to CSV format. to_latex : Export results to LaTeX table format. """ if self.is_staggered: return self.to_excel_staggered(path) summary_rows = [ {"Statistic": "ATT", "Value": self.att}, {"Statistic": "SE", "Value": self.se_att}, {"Statistic": "t", "Value": self.t_stat}, {"Statistic": "p", "Value": self.pvalue}, {"Statistic": "CI_lower", "Value": self.ci_lower}, {"Statistic": "CI_upper", "Value": self.ci_upper}, {"Statistic": "N", "Value": self.nobs}, {"Statistic": "N_treated", "Value": self.n_treated}, {"Statistic": "N_control", "Value": self.n_control}, {"Statistic": "vce", "Value": self.vce_type}, {"Statistic": "rolling", "Value": self.rolling}, ] if self.ri_pvalue is not None: summary_rows.extend([ {"Statistic": "ri_pvalue", "Value": self.ri_pvalue}, {"Statistic": "ri_seed", "Value": self.ri_seed}, {"Statistic": "rireps", "Value": self.rireps}, ]) df_summary = pd.DataFrame(summary_rows) with pd.ExcelWriter(path) as writer: df_summary.to_excel(writer, sheet_name='Summary', index=False) if isinstance(self.att_by_period, pd.DataFrame) and not self.att_by_period.empty: self.att_by_period.to_excel(writer, sheet_name='ByPeriod', index=False) if self.ri_pvalue is not None: df_ri = pd.DataFrame([ {"Parameter": "ri_pvalue", "Value": self.ri_pvalue}, {"Parameter": "ri_seed", "Value": self.ri_seed}, {"Parameter": "rireps", "Value": self.rireps}, {"Parameter": "ATT_obs", "Value": self.att}, ]) df_ri.to_excel(writer, sheet_name='RI', index=False)
[docs] def to_excel_staggered(self, path: str): """ Export staggered DiD results to a multi-sheet Excel file. Creates an Excel workbook with sheets tailored to the aggregation level. The Summary sheet is always included. Additional sheets depend on the aggregate parameter used during estimation. Parameters ---------- path : str File path for the Excel output (.xlsx extension required). Raises ------ ValueError If called on non-staggered estimation results. ImportError If openpyxl package is not installed. Notes ----- Sheet structure varies by aggregation level: - aggregate='overall': Summary, Overall, Cohort, CohortTime, Weights, Metadata - aggregate='cohort': Summary, Cohort, CohortTime, Weights, Metadata - aggregate='none': Summary, CohortTime, Metadata """ if not self.is_staggered: raise ValueError("to_excel_staggered requires staggered DiD results") try: import openpyxl # noqa: F401 except ImportError: raise ImportError( "to_excel_staggered requires openpyxl. " "Install it with: pip install openpyxl" ) with pd.ExcelWriter(path, engine='openpyxl') as writer: # === Sheet 1: Summary (always present) === summary_rows = [ {"Item": "Estimation Type", "Value": "Staggered DiD"}, {"Item": "Treatment Cohorts", "Value": ', '.join(map(str, self.cohorts))}, {"Item": "Number of Cohorts", "Value": len(self.cohorts)}, {"Item": "N Treated Units", "Value": self.n_treated}, {"Item": "N Control Units", "Value": self.n_control}, {"Item": "N Never Treated", "Value": self.n_never_treated}, {"Item": "Control Group", "Value": self.control_group_used}, {"Item": "Transformation", "Value": self.rolling}, {"Item": "Estimator", "Value": self.estimator}, {"Item": "Aggregation", "Value": self.aggregate}, {"Item": "VCE Type", "Value": self.vce_type}, ] # Only add overall effect info when aggregate='overall' if self.att_overall is not None: summary_rows.extend([ {"Item": "Overall ATT (τ_ω)", "Value": self.att_overall}, {"Item": "Overall SE", "Value": self.se_overall}, {"Item": "Overall CI Lower", "Value": self.ci_overall_lower}, {"Item": "Overall CI Upper", "Value": self.ci_overall_upper}, ]) df_summary = pd.DataFrame(summary_rows) df_summary.to_excel(writer, sheet_name='Summary', index=False) # === Sheet 2: Overall Effect (only when aggregate='overall') === if self.att_overall is not None: df_overall = pd.DataFrame([{ 'att_overall': self.att_overall, 'se': self.se_overall, 't_stat': self.t_stat_overall, 'pvalue': self.pvalue_overall, 'ci_lower': self.ci_overall_lower, 'ci_upper': self.ci_overall_upper, }]) df_overall.to_excel(writer, sheet_name='Overall', index=False) # === Sheet 3: Cohort Effects (only when aggregate∈{'cohort','overall'}) === if self.att_by_cohort is not None and not self.att_by_cohort.empty: self.att_by_cohort.to_excel(writer, sheet_name='Cohort', index=False) # === Sheet 4: Cohort-Time Effects (always present) === if self.att_by_cohort_time is not None and not self.att_by_cohort_time.empty: self.att_by_cohort_time.to_excel(writer, sheet_name='CohortTime', index=False) # === Sheet 5: Cohort Weights (only when aggregate∈{'cohort','overall'}) === if self.cohort_weights: df_weights = pd.DataFrame([ {'cohort': g, 'weight': w, 'n_units': self.cohort_sizes.get(g, None)} for g, w in sorted(self.cohort_weights.items()) ]) df_weights.to_excel(writer, sheet_name='Weights', index=False) # === Sheet 6: Metadata (always present) === metadata_rows = [ {"Parameter": "is_staggered", "Value": True}, {"Parameter": "control_group", "Value": self.control_group}, {"Parameter": "control_group_used", "Value": self.control_group_used}, {"Parameter": "aggregate", "Value": self.aggregate}, {"Parameter": "estimator", "Value": self.estimator}, {"Parameter": "rolling", "Value": self.rolling}, {"Parameter": "vce_type", "Value": self.vce_type}, {"Parameter": "n_never_treated", "Value": self.n_never_treated}, ] df_metadata = pd.DataFrame(metadata_rows) df_metadata.to_excel(writer, sheet_name='Metadata', index=False)
[docs] def to_csv(self, path: str): """ Export period-specific treatment effects to a CSV file. Parameters ---------- path : str File path for the CSV output. Raises ------ ValueError If att_by_period is not available (None or empty DataFrame). See Also -------- to_excel : Export comprehensive results to Excel format. """ if not isinstance(self.att_by_period, pd.DataFrame) or self.att_by_period.empty: raise ValueError("att_by_period is not available for CSV export") self.att_by_period.to_csv(path, index=False)
[docs] def to_latex(self, path: str): """ Export estimation results to a LaTeX table file. Generates a LaTeX document containing summary statistics (ATT, SE, t-statistic, p-value, CI bounds, sample sizes) and period-specific effects if available. Parameters ---------- path : str File path for the LaTeX output (.tex extension recommended). See Also -------- to_excel : Export comprehensive results to Excel format. to_csv : Export period-specific effects to CSV format. """ summary_rows = [ ["ATT", f"{self.att:.6g}"], ["SE", f"{self.se_att:.6g}"], ["t", f"{self.t_stat:.6g}"], ["p", f"{self.pvalue:.6g}"], ["CI_lower", f"{self.ci_lower:.6g}"], ["CI_upper", f"{self.ci_upper:.6g}"], ["N", f"{self.nobs}"], ["N_treated", f"{self.n_treated}"], ["N_control", f"{self.n_control}"], ["vce", f"{self.vce_type}"], ["rolling", f"{self.rolling}"], ] if self.ri_pvalue is not None: summary_rows.extend([ ["ri_pvalue", f"{self.ri_pvalue:.6g}"], ["ri_seed", f"{self.ri_seed}"], ["rireps", f"{self.rireps}"], ]) df_summary = pd.DataFrame(summary_rows, columns=["Statistic", "Value"]) content = [] content.append(df_summary.to_latex(index=False, escape=True)) if isinstance(self.att_by_period, pd.DataFrame) and not self.att_by_period.empty: content.append(self.att_by_period.to_latex(index=False, escape=True, na_rep='--')) with open(path, 'w', encoding='utf-8') as f: f.write("\n\n".join(content))