向量自回归模型,简称VAR模型,是AR 模型的推广,是一种常用的计量经济模型。在一定的条件下,多元MA和ARMA模型也可转化成VAR模型。
单变量的时间序列的分析模式可以推广到多变量时间序列,建立向量自回归模型。向量自回归模型通常用于描述多变量时间序列之间的变动关系。
多变量时间序列具有一个以上的时间依赖变量。每个变量不仅取决于其过去的值,而且还对其他变量有一定的依赖性。
在VAR模型中,每个变量是其自身过去值和所有其他变量的过去值的线性函数。
模型的基本形式是弱平稳过程的自回归表达式,描述的是在同一样本期间内的若干变量可以作为它们过去值的线性函数。
在VAR模型中,必须保证时间序列稳定。如果不能保证时间序列稳定,那么会导致两种结果:
第一,向量自回归系数的估计值是负数,做完 t 检验后,得到的结果是无效的;
第二,两个独立变量的相关关系或者回归关系是假的,使得模型的结果无效。
import numpy as np import pandas as pd import matplotlib.pyplot as plt from statsmodels.tsa.api import VAR from statsmodels.tsa.stattools import adfuller, grangercausalitytests from statsmodels.stats.stattools import durbin_watson import warnings warnings.filterwarnings('ignore')# 设置中文显示 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False# 1. 生成示例数据 np.random.seed(42) n_obs = 200 date_range = pd.date_range(start='2010-01-01', periods=n_obs, freq='M')# 生成三个相关的时间序列 data = pd.DataFrame(index=date_range) data['GDP'] = np.cumsum(np.random.normal(0.5, 0.2, n_obs)) + 100 data['Inflation'] = np.sin(np.arange(n_obs) * 0.1) + np.random.normal(0, 0.1, n_obs) + 2 data['Unemployment'] = np.cos(np.arange(n_obs) * 0.05) + np.random.normal(0, 0.1, n_obs) + 5# 2. 数据可视化 plt.figure(figsize=(12, 8)) for i, col in enumerate(data.columns, 1):plt.subplot(3, 1, i)plt.plot(data.index, data[col])plt.title(f'{col} 时间序列')plt.grid(True) plt.tight_layout() plt.show()# 3. 平稳性检验 print("平稳性检验 (ADF检验):") for col in data.columns:result = adfuller(data[col])print(f"{col}: ADF统计量 = {result[0]:.4f}, p值 = {result[1]:.4f}")# 4. 格兰杰因果检验 print("\n格兰杰因果检验:") max_lag = 4 for col1 in data.columns:for col2 in data.columns:if col1 != col2:print(f"{col2} -> {col1}:")test_result = grangercausalitytests(data[[col1, col2]], maxlag=max_lag, verbose=False)p_values = [test_result[i+1][0]['ssr_chi2test'][1] for i in range(max_lag)]print(f" p值: {[f'{p:.4f}' for p in p_values]}")# 5. 差分处理使数据平稳 data_diff = data.diff().dropna()# 6. 确定最佳滞后阶数 model = VAR(data_diff) lag_results = model.select_order(maxlags=10) print(f"\n最佳滞后阶数: {lag_results.selected_orders}")# 7. 拟合VAR模型 best_lag = lag_results.selected_orders['aic'] var_model = model.fit(best_lag) print("\n模型摘要:") print(var_model.summary())# 8. 残差检验 print("\nDurbin-Watson检验 (接近2表示无自相关):") residuals = var_model.resid dw_results = durbin_watson(residuals) for i, col in enumerate(data.columns):print(f"{col}: {dw_results[i]:.4f}")# 9. 预测 forecast_steps = 12 forecast = var_model.forecast(y=data_diff.values[-best_lag:], steps=forecast_steps) forecast_index = pd.date_range(start=data.index[-1] + pd.DateOffset(months=1), periods=forecast_steps, freq='M')# 将差分预测值转换为原始尺度 last_values = data.iloc[-1].values forecast_original = np.zeros_like(forecast) for i in range(forecast_steps):if i == 0:forecast_original[i] = last_values + forecast[i]else:forecast_original[i] = forecast_original[i-1] + forecast[i]forecast_df = pd.DataFrame(forecast_original, index=forecast_index, columns=data.columns)# 10. 可视化预测结果 plt.figure(figsize=(14, 10))# 绘制每个变量的历史数据和预测 for i, col in enumerate(data.columns, 1):plt.subplot(3, 1, i)plt.plot(data.index, data[col], label='历史数据')plt.plot(forecast_df.index, forecast_df[col], label='预测', color='red', linestyle='--')plt.title(f'{col} - 历史数据与预测')plt.legend()plt.grid(True)plt.tight_layout() plt.show()# 11. 脉冲响应分析 irf = var_model.irf(periods=20) irf.plot(orth=False, figsize=(12, 10)) plt.suptitle('脉冲响应函数', fontsize=16) plt.tight_layout() plt.show()# 12. 方差分解 fevd = var_model.fevd(periods=20) fevd_summary = fevd.summary() print("\n方差分解:") print(fevd_summary)# 13. 预测评估(如果需要测试集) train_size = int(len(data) * 0.8) train_data = data.iloc[:train_size] test_data = data.iloc[train_size:]# 在训练数据上重新拟合模型 train_diff = train_data.diff().dropna() var_model_train = VAR(train_diff).fit(best_lag)# 预测测试集 forecast_test = var_model_train.forecast(y=train_diff.values[-best_lag:], steps=len(test_data))# 计算预测误差 from sklearn.metrics import mean_squared_error, mean_absolute_errorfor i, col in enumerate(data.columns):mse = mean_squared_error(test_data[col].iloc[1:], forecast_test[1:, i] + train_data[col].iloc[-1])mae = mean_absolute_error(test_data[col].iloc[1:], forecast_test[1:, i] + train_data[col].iloc[-1])print(f"{col} - MSE: {mse:.4f}, MAE: {mae:.4f}")