首页 > 基础资料 博客日记

多尺度地理加权回归与地理探测器集成分析方法及实现

2026-06-01 14:00:05基础资料围观1

这篇文章介绍了多尺度地理加权回归与地理探测器集成分析方法及实现,分享给大家做个参考,收藏极客资料网收获更多编程知识

多尺度地理加权回归与地理探测器集成分析方法及实现

摘要
本文介绍一种将多尺度地理加权回归(MGWR)与地理探测器相结合的空间分析集成框架。该框架可广泛应用于社会、经济、环境等领域,用于量化解释变量的空间异质性、探测因子交互效应,并通过留一交叉验证自动筛选最优预测模型。代码实现以非地图统计图表和结构化数据表为核心输出,涵盖MGWR局部系数、地理探测器q值、交互矩阵、多模型精度对比、残差分析等全套结果。文章对核心算法与关键代码段进行原理阐释,并在文末提供可直接运行的完整代码,便于研究者快速迁移至自身数据场景。


1. 方法概述

在空间数据分析中,研究者通常需要回答三个递进的问题:

  1. 各解释变量对目标变量的影响是否存在空间非平稳性?——由 多尺度地理加权回归(MGWR) 给出每个空间单元的局部系数。
  2. 单因子及因子间的交互作用能在多大程度上解释目标变量的空间分异?——由 地理探测器 计算 q 统计量并判断交互类型。
  3. 在给定的数据集中,哪种统计或机器学习模型能提供最可靠的目标变量预测?——通过 留一交叉验证(LOOCV) 对七种候选模型进行精度比较,自动选出最优模型。

本代码将上述三个模块无缝衔接,并额外计算全局 Moran’s I 以检验目标变量的空间自相关性,全部结果输出为 Excel 表格与论文级统计图,全程无需手动操作。框架本身不依赖于特定学科背景,用户仅需按格式准备数据即可一键运行。


2. 环境配置与数据约定

依赖库:

import pandas as pd, numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns
from mgwr.gwr import MGWR
from mgwr.sel_bw import Sel_BW
from sklearn.model_selection import LeaveOneOut, cross_val_predict
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from esda.moran import Moran
from libpysal.weights import Queen

输入文件:

  • data.csv(或 .xlsx):至少包含 3 列——经度 lon、纬度 lat、目标变量(示例中为 XHM),以及若干特征列(示例中 Feat1~Feat7)。
  • shp/市.shp:研究区面状矢量数据,用于建立空间权重并计算 Moran’s I;若不需要空间自相关分析,可提供任一 shp 文件占位。

自定义映射:
所有特征与目标变量的中文名称均在字典 FEATURE_CN_MAP 中配置,例如:

FEATURE_CN_MAP = {
    "Feat1": "人均GDP",
    "Feat2": "专利/万人",
    # 按实际情况修改
}
TARGET_COL = "XHM"

框架会基于此映射自动生成图表中的变量标签,使其具有可读性。


3. 核心功能实现拆解

3.1 MGWR 建模与局部系数提取

MGWR 是地理加权回归(GWR)的扩展,允许不同变量拥有各自的空间带宽,从而更细致地刻画不同解释因素的空间作用尺度差异。

处理流程:

  1. 对自变量进行 Z-score 标准化,消除量纲影响;
  2. 使用黄金分割搜索算法(Sel_BW)为每个变量(含截距)寻找最优带宽;
  3. 拟合 MGWR 模型,提取每个样本点的预测值、残差及各变量局部系数;
  4. 计算模型拟合精度:训练 R²、调整 R²、RMSE、MAE、AIC、AICc、BIC。

关键代码片段:

def run_mgwr(data):
    x = data[FEATURE_COLS].values
    y = data[TARGET_COL].values.reshape((-1, 1))
    coords = list(zip(data["lon"], data["lat"]))

    scaler = StandardScaler()
    x_scaled = scaler.fit_transform(x)

    # 带宽搜索范围设置
    n_obs = len(data)
    n_features_with_const = x_scaled.shape[1] + 1
    min_bw = max(n_features_with_const + 2, 10)
    max_bw = n_obs - 1

    selector = Sel_BW(coords, y, x_scaled, multi=True)
    bw = selector.search(bw_min=min_bw, bw_max=max_bw)

    # 模型拟合
    model = MGWR(coords, y, x_scaled, selector)
    results = model.fit()
    pred = results.predy.flatten()
    y_flat = y.flatten()

    # 提取局部系数
    params = results.params
    result_data = data.copy()
    result_data["Intercept_coef"] = params[:, 0]
    for idx, col in enumerate(FEATURE_COLS):
        result_data[f"{col}_coef"] = params[:, idx + 1]
        result_data[f"{FEATURE_CN_MAP[col]}_局部系数"] = params[:, idx + 1]

    # 精度指标
    metrics = pd.DataFrame([{
        "模型": "MGWR",
        "训练R2": r2_score(y_flat, pred),
        "调整R2": adjusted_r2(y_flat, pred, len(FEATURE_COLS)),
        "RMSE": rmse(y_flat, pred),
        "MAE": mean_absolute_error(y_flat, pred)
    }])
    return result_data, metrics, bw_table, coef_summary

注意: 若样本量过小(小于变量数 + 2),MGWR 无法进行有效的带宽搜索,代码会提前抛出清晰错误提示。

3.2 地理探测器(单因子与交互探测)

地理探测器基于“如果自变量对因变量有重要影响,那么自变量与因变量的空间分布应趋于一致”的假设,通过层内方差和总方差的比值构造 q 统计量

$$
q = 1 - \frac{\sum_{h=1}^{L} N_h \sigma_h^2}{N \sigma^2}
$$

式中,$L$ 为分类数,$N_h$ 与 $\sigma_h^2$ 分别为第 $h$ 层的样本数与方差。q 值范围为 $[0, 1]$,值越大表示该因子对目标变量的解释力越强。

连续变量离散化: 由于地理探测器要求输入为类型变量,代码默认按分位数将连续变量分为 5 类:

def discretize(series, bins=5):
    return pd.qcut(series, q=bins, duplicates="drop")

交互探测 通过将两因子的分类标签交叉拼接生成新的组合分类,计算 $q_{12}$,并与 $q_1$、$q_2$ 比较判断交互类型:

def classify_interaction(q1, q2, q12):
    if q12 < min(q1, q2):          return "非线性减弱"
    elif q12 < max(q1, q2):        return "单因子非线性减弱"
    elif np.isclose(q12, q1+q2):   return "独立"
    elif q12 < q1+q2:              return "双因子增强"
    else:                          return "非线性增强"

最终输出包含 q 值排序表、对称交互矩阵、以及拆分的交互类型明细表。

3.3 多模型预测精度自动比较

考虑到研究样本量通常有限,代码采用 留一交叉验证(LOOCV) 评估模型泛化能力。参与比较的模型涵盖线性模型(OLS、Ridge、Lasso)、支持向量回归(RBF 核)以及三种集成树模型(随机森林、极端随机树、梯度提升树)。

def compare_prediction_models(data):
    x = data[FEATURE_COLS].values
    y = data[TARGET_COL].values
    loo = LeaveOneOut()

    models = {
        "线性回归": Pipeline([...]),
        "Ridge":    Pipeline([...]),
        # ...
    }

    for name, model in models.items():
        cv_pred = cross_val_predict(model, x, y, cv=loo)
        model.fit(x, y)
        train_pred = model.predict(x)
        # 记录训练R2、LOOCV_R2、RMSE、MAE

    accuracy = pd.DataFrame(rows).sort_values("LOOCV_R2", ascending=False)
    best_model = fitted_models[accuracy.iloc[0]["模型"]]
    return accuracy, best_pred_table, feature_importance

最终按 LOOCV_R2 降序排列,将最优模型的全量预测值、LOOCV 预测值及残差单独输出,若最优模型为树模型,则同时输出变量重要性。

3.4 空间自相关检验

基于 Queen 邻接权重矩阵计算目标变量的全局 Moran’s I,作为空间依赖性的诊断依据:

def compute_moran(data, gdf):
    merged = gdf.merge(data[[CITY_FIELD, TARGET_COL]], on=CITY_FIELD)
    w = Queen.from_dataframe(merged)
    w.transform = 'r'
    moran = Moran(merged[TARGET_COL], w)
    return pd.DataFrame([{"Moran_I": moran.I, "P_value": moran.p_sim, "Z_score": moran.z_sim}])

4. 可视化输出与结果导出

make_charts 函数自动生成 11 张适用于论文展示的统计图表(均为非地图类):

图号 内容 图表类型
01 地理探测器 q 值排序 横向条形图
02 影响因子解释力棒棒糖图 克利夫兰点图
03 交互探测热力图 对称矩阵热力图
04 Top 12 交互组合 q 值 分组条形图
05 七种模型精度对比 水平点线图
06 最优模型真实值 vs 预测值 散点图+回归线
07 最优模型 LOOCV 残差分布 直方图+核密度
08 最优模型变量重要性(若可得) 条形图
09 变量间相关系数热力图 相关矩阵热力图
10 MGWR 局部系数分布箱线图 箱线图
11 各城市局部系数热力图 矩阵热力图

所有图表以 600 dpi 分辨率保存至输出目录,文件名自动处理特殊字符。同时,所有中间结果表合并写入一个 Excel 工作簿,并额外导出主要结果(q 值、交互矩阵、精度对比、预测值等)的 CSV 副本。


5. 使用说明与注意事项

  1. 路径与数据准备:修改 EXCEL_PATHSHP_PATHOUT_DIR 三个变量至实际路径;确保 CSV 中经度、纬度、目标变量及特征列名与脚本一致。
  2. 特征变量灵活配置:只需在 FEATURE_CN_MAP 字典中增删条目,脚本会自动适配后续分析。
  3. 样本量要求:MGWR 要求样本量 ≥ 变量数(含截距)+ 2,否则抛出错误;地理探测器在样本量很小时 q 值可能不稳定。
  4. 离散化层数discretize 函数默认将连续变量分 5 类,可根据研究需要调整 bins 参数。
  5. 运行环境:推荐 Python 3.8 及以上版本,依赖库可通过以下命令安装:
    pip install geopandas mgwr esda libpysal seaborn openpyxl scikit-learn
    
  6. 自定义修改:若希望调整模型超参数或增加新的比较模型,可直接在 compare_prediction_models 函数的 models 字典中修改。

6. 完整代码

以下代码已去除特定领域背景,直接复制即可运行。用户仅需配置输入路径与特征映射字典。

# -*- coding: utf-8 -*-
"""
MGWR + 地理探测器 + 精度提升框架
--------------------------------
输出内容:
1. MGWR 模型精度、带宽、预测值、残差、局部系数数据表
2. 地理探测器 q 值、交互探测矩阵与交互类型数据表
3. 非地图型论文图表:q 值图、交互热力图、精度对比图、拟合图、残差图等
4. 自动比较多种预测模型,给出当前数据下精度最高的模型
"""

import os
import warnings
from pathlib import Path

warnings.filterwarnings("ignore")

import geopandas as gpd
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from esda.moran import Moran
from libpysal.weights import Queen
from mgwr.gwr import MGWR
from mgwr.sel_bw import Sel_BW
from sklearn.ensemble import ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import Lasso, LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import LeaveOneOut, cross_val_predict
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR


# =========================================================
# 1. 参数设置(根据实际数据修改此部分)
# =========================================================

EXCEL_PATH = r"data.csv"            # 属性数据路径
SHP_PATH = r"shp\市.shp"            # 矢量面数据路径
OUT_DIR = r"./output"              # 输出文件夹

CITY_FIELD = "name"                # 城市名称字段(若不存在,会按经纬度匹配)
LON_COL = "lon"
LAT_COL = "lat"
TARGET_COL = "XHM"                # 目标变量

# 特征变量中英文映射(可按需增删)
FEATURE_CN_MAP = {
    "Feat1": "人均GDP",
    "Feat2": "专利/万人",
    "Feat3": "对外开放度",
    "Feat4": "产业高级化",
    "Feat5": "科技支出占比",
    "Feat6": "交通可达性",
    "Feat7": "普惠金融指数",
}

TARGET_CN_NAME = "目标变量"
FEATURE_COLS = list(FEATURE_CN_MAP.keys())
FEATURE_NAMES = [FEATURE_CN_MAP[col] for col in FEATURE_COLS]

os.makedirs(OUT_DIR, exist_ok=True)

# 图形全局设置
mpl.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "Arial Unicode MS"]
mpl.rcParams["axes.unicode_minus"] = False
mpl.rcParams["figure.dpi"] = 120
mpl.rcParams["savefig.dpi"] = 600
mpl.rcParams["axes.linewidth"] = 1.1
sns.set_theme(style="whitegrid", font="SimHei")

INVALID_FILENAME_CHARS = str.maketrans({char: "_" for char in r'\/:*?"<>|'})


def safe_filename(name):
    return name.translate(INVALID_FILENAME_CHARS)


def save_fig(filename):
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, safe_filename(filename)), bbox_inches="tight", facecolor="white")
    plt.close()


def rmse(y_true, y_pred):
    return mean_squared_error(y_true, y_pred) ** 0.5


def adjusted_r2(y_true, y_pred, p):
    n = len(y_true)
    r2 = r2_score(y_true, y_pred)
    return 1 - (1 - r2) * (n - 1) / (n - p - 1)


def load_data(path):
    file_ext = os.path.splitext(path)[1].lower()
    if file_ext == ".csv":
        try:
            data = pd.read_csv(path, encoding="utf-8")
        except UnicodeDecodeError:
            data = pd.read_csv(path, encoding="gbk")
    elif file_ext == ".xlsx":
        data = pd.read_excel(path, engine="openpyxl")
    elif file_ext == ".xls":
        data = pd.read_excel(path, engine="xlrd")
    else:
        raise ValueError(f"不支持的数据文件格式:{file_ext}")

    data = data.drop(columns=[col for col in data.columns if str(col).startswith("Unnamed")], errors="ignore")
    return data


def load_shp(path):
    if os.path.exists(path):
        return gpd.read_file(path)

    shp_files = list(Path("shp").glob("*.shp"))
    if not shp_files:
        raise FileNotFoundError("shp 文件夹中未找到 .shp 文件,无法计算 Moran's I。")
    return gpd.read_file(shp_files[0])


def fill_city_name_by_coords(data, gdf):
    if CITY_FIELD in data.columns:
        return data

    if {"x", "y"}.issubset(gdf.columns):
        shp_coords = gdf[["x", "y"]].to_numpy()
    else:
        shp_centroids = gdf.geometry.centroid
        shp_coords = np.column_stack([shp_centroids.x, shp_centroids.y])

    data_coords = data[[LON_COL, LAT_COL]].to_numpy()
    distances = np.sqrt(((data_coords[:, None, :] - shp_coords[None, :, :]) ** 2).sum(axis=2))
    nearest_idx = distances.argmin(axis=1)

    if len(set(nearest_idx)) != len(data):
        raise ValueError("经纬度匹配 shp 城市时出现重复匹配,请检查数据坐标。")

    data[CITY_FIELD] = gdf.iloc[nearest_idx][CITY_FIELD].to_numpy()
    print(f"CSV 缺少 {CITY_FIELD} 字段,已按经纬度匹配 shp 城市。")
    return data


def geodetector_q(y, x):
    temp = pd.DataFrame({"y": y, "x": x}).dropna()
    total_n = len(temp)
    total_var = np.var(temp["y"], ddof=1)
    if total_var == 0 or total_n == 0:
        return np.nan

    q = 1
    for group in temp["x"].unique():
        sub = temp[temp["x"] == group]
        if len(sub) <= 1:
            group_var = 0
        else:
            group_var = np.var(sub["y"], ddof=1)
        q -= (len(sub) * group_var) / (total_n * total_var)
    return q


def discretize(series, bins=5):
    return pd.qcut(series, q=bins, duplicates="drop")


def classify_interaction(q1, q2, q12):
    min_q = min(q1, q2)
    max_q = max(q1, q2)
    sum_q = q1 + q2

    if q12 < min_q:
        return "非线性减弱"
    if min_q <= q12 < max_q:
        return "单因子非线性减弱"
    if np.isclose(q12, sum_q):
        return "独立"
    if max_q < q12 < sum_q:
        return "双因子增强"
    if q12 > sum_q:
        return "非线性增强"
    return "增强"


def run_mgwr(data):
    x = data[FEATURE_COLS].values
    y = data[TARGET_COL].values.reshape((-1, 1))
    coords = list(zip(data[LON_COL], data[LAT_COL]))

    scaler = StandardScaler()
    x_scaled = scaler.fit_transform(x)

    n_obs = len(data)
    n_features_with_const = x_scaled.shape[1] + 1
    min_bw = max(n_features_with_const + 2, 10)
    max_bw = n_obs - 1

    if max_bw < min_bw:
        raise ValueError(
            f"样本量过小,无法进行 MGWR 带宽搜索:样本量={n_obs},"
            f"变量数含截距={n_features_with_const}。"
        )

    print(f"\nMGWR 带宽搜索中:样本量={n_obs},搜索范围={min_bw}-{max_bw}")

    selector = Sel_BW(coords, y, x_scaled, multi=True, n_jobs=1)
    bw = selector.search(
        init_multi=max_bw,
        bw_min=min_bw,
        bw_max=max_bw,
        multi_bw_min=[min_bw],
        multi_bw_max=[max_bw],
    )

    print("最优带宽:", bw)

    model = MGWR(coords, y, x_scaled, selector)
    results = model.fit()
    pred = results.predy.flatten()
    y_flat = y.flatten()

    result_data = data.copy()
    result_data["MGWR_pred"] = pred
    result_data["MGWR_resid"] = y_flat - pred
    result_data["MGWR_abs_resid"] = np.abs(result_data["MGWR_resid"])

    params = results.params
    result_data["Intercept_coef"] = params[:, 0]
    for idx, col in enumerate(FEATURE_COLS):
        result_data[f"{col}_coef"] = params[:, idx + 1]
        result_data[f"{FEATURE_CN_MAP[col]}_局部系数"] = params[:, idx + 1]

    metrics = pd.DataFrame(
        [
            {
                "模型": "MGWR",
                "训练R2": r2_score(y_flat, pred),
                "调整R2": adjusted_r2(y_flat, pred, len(FEATURE_COLS)),
                "RMSE": rmse(y_flat, pred),
                "MAE": mean_absolute_error(y_flat, pred),
                "AIC": getattr(results, "aic", np.nan),
                "AICc": getattr(results, "aicc", np.nan),
                "BIC": getattr(results, "bic", np.nan),
            }
        ]
    )

    bw_table = pd.DataFrame(
        {
            "变量": ["截距"] + FEATURE_NAMES,
            "带宽": np.asarray(bw).astype(float),
        }
    )

    coef_summary = result_data[
        ["Intercept_coef"] + [f"{col}_coef" for col in FEATURE_COLS]
    ].describe().T.reset_index()
    coef_summary["变量"] = ["截距"] + FEATURE_NAMES
    coef_summary = coef_summary[["变量", "mean", "std", "min", "25%", "50%", "75%", "max"]]

    return result_data, metrics, bw_table, coef_summary


def compare_prediction_models(data):
    x = data[FEATURE_COLS].values
    y = data[TARGET_COL].values
    loo = LeaveOneOut()

    models = {
        "线性回归": Pipeline([("scaler", StandardScaler()), ("model", LinearRegression())]),
        "Ridge": Pipeline([("scaler", StandardScaler()), ("model", Ridge(alpha=1.0))]),
        "Lasso": Pipeline([("scaler", StandardScaler()), ("model", Lasso(alpha=0.001, max_iter=10000))]),
        "SVR-RBF": Pipeline([("scaler", StandardScaler()), ("model", SVR(C=10, gamma="scale", epsilon=0.01))]),
        "随机森林": RandomForestRegressor(n_estimators=300, random_state=42, min_samples_leaf=1),
        "极端随机树": ExtraTreesRegressor(n_estimators=300, random_state=42, min_samples_leaf=1),
        "梯度提升树": GradientBoostingRegressor(n_estimators=200, learning_rate=0.03, max_depth=2, random_state=42),
    }

    rows = []
    predictions = {}
    fitted_models = {}

    print("\n比较预测模型精度(Leave-One-Out 交叉验证)...")
    for name, model in models.items():
        cv_pred = cross_val_predict(model, x, y, cv=loo)
        model.fit(x, y)
        train_pred = model.predict(x)

        rows.append(
            {
                "模型": name,
                "训练R2": r2_score(y, train_pred),
                "LOOCV_R2": r2_score(y, cv_pred),
                "LOOCV_RMSE": rmse(y, cv_pred),
                "LOOCV_MAE": mean_absolute_error(y, cv_pred),
            }
        )

        predictions[name] = {"train": train_pred, "cv": cv_pred}
        fitted_models[name] = model

    accuracy = pd.DataFrame(rows).sort_values("LOOCV_R2", ascending=False)
    best_name = accuracy.iloc[0]["模型"]
    best_model = fitted_models[best_name]

    best_pred_table = data[[CITY_FIELD, TARGET_COL]].copy()
    best_pred_table["最优模型"] = best_name
    best_pred_table["预测值"] = predictions[best_name]["train"]
    best_pred_table["LOOCV预测值"] = predictions[best_name]["cv"]
    best_pred_table["训练残差"] = best_pred_table[TARGET_COL] - best_pred_table["预测值"]
    best_pred_table["LOOCV残差"] = best_pred_table[TARGET_COL] - best_pred_table["LOOCV预测值"]

    feature_importance = None
    if hasattr(best_model, "feature_importances_"):
        feature_importance = pd.DataFrame(
            {
                "变量": FEATURE_NAMES,
                "重要性": best_model.feature_importances_,
            }
        ).sort_values("重要性", ascending=False)

    return accuracy, best_pred_table, feature_importance


def run_geodetector(data):
    q_rows = []
    bins_map = {}

    for col in FEATURE_COLS:
        bins = discretize(data[col], bins=5)
        bins_map[col] = bins
        q_rows.append(
            {
                "变量": FEATURE_CN_MAP[col],
                "字段": col,
                "q值": geodetector_q(data[TARGET_COL], bins),
            }
        )

    q_df = pd.DataFrame(q_rows).sort_values("q值", ascending=False)

    interaction_matrix = pd.DataFrame(index=FEATURE_NAMES, columns=FEATURE_NAMES, dtype=float)
    interaction_rows = []

    q_lookup = dict(zip(q_df["字段"], q_df["q值"]))
    for col1 in FEATURE_COLS:
        for col2 in FEATURE_COLS:
            interaction = bins_map[col1].astype(str) + "_" + bins_map[col2].astype(str)
            q_inter = geodetector_q(data[TARGET_COL], interaction)
            interaction_matrix.loc[FEATURE_CN_MAP[col1], FEATURE_CN_MAP[col2]] = q_inter

            if FEATURE_COLS.index(col1) < FEATURE_COLS.index(col2):
                q1 = q_lookup[col1]
                q2 = q_lookup[col2]
                interaction_rows.append(
                    {
                        "变量1": FEATURE_CN_MAP[col1],
                        "变量2": FEATURE_CN_MAP[col2],
                        "q1": q1,
                        "q2": q2,
                        "q交互": q_inter,
                        "交互增量": q_inter - max(q1, q2),
                        "交互类型": classify_interaction(q1, q2, q_inter),
                    }
                )

    interaction_long = pd.DataFrame(interaction_rows).sort_values("q交互", ascending=False)
    return q_df, interaction_matrix, interaction_long


def compute_moran(data, gdf):
    merged = gdf.merge(data[[CITY_FIELD, TARGET_COL]], on=CITY_FIELD)
    w = Queen.from_dataframe(merged)
    w.transform = "r"
    moran = Moran(merged[TARGET_COL], w)
    return pd.DataFrame(
        [
            {
                "指标": TARGET_CN_NAME,
                "Moran_I": moran.I,
                "P_value": moran.p_sim,
                "Z_score": moran.z_sim,
            }
        ]
    )


def make_charts(result_data, q_df, interaction_matrix, interaction_long, accuracy, best_pred_table, feature_importance):
    print("\n绘制非地图统计图表...")

    palette = sns.color_palette("Blues_r", n_colors=len(q_df))

    plt.figure(figsize=(10, 6))
    ax = sns.barplot(data=q_df, x="q值", y="变量", palette=palette)
    for container in ax.containers:
        ax.bar_label(container, fmt="%.3f", padding=4, fontsize=10)
    ax.set_title("地理探测器 q 值排序", fontsize=18, pad=14)
    ax.set_xlabel("q 值")
    ax.set_ylabel("")
    ax.grid(axis="x", linestyle="--", alpha=0.25)
    save_fig("01_地理探测器_q值排序.png")

    plt.figure(figsize=(10, 6))
    sorted_q = q_df.sort_values("q值")
    plt.hlines(y=sorted_q["变量"], xmin=0, xmax=sorted_q["q值"], color="#b7c9d6", linewidth=3)
    plt.scatter(sorted_q["q值"], sorted_q["变量"], s=180, c=sorted_q["q值"], cmap="viridis", edgecolor="white", linewidth=1.5)
    for _, row in sorted_q.iterrows():
        plt.text(row["q值"] + 0.008, row["变量"], f"{row['q值']:.3f}", va="center", fontsize=10)
    plt.title("影响因子解释力棒棒糖图", fontsize=18, pad=14)
    plt.xlabel("q 值")
    plt.ylabel("")
    plt.grid(axis="x", linestyle="--", alpha=0.25)
    save_fig("02_q值棒棒糖图.png")

    plt.figure(figsize=(9.5, 8))
    sns.heatmap(
        interaction_matrix,
        annot=True,
        fmt=".3f",
        cmap="RdYlBu_r",
        square=True,
        linewidths=0.8,
        linecolor="white",
        cbar_kws={"label": "q 交互值"},
    )
    plt.title("地理探测器交互探测热力图", fontsize=18, pad=14)
    plt.xticks(rotation=35, ha="right")
    plt.yticks(rotation=0)
    save_fig("03_交互探测热力图.png")

    top_interactions = interaction_long.head(12).copy()
    top_interactions["组合"] = top_interactions["变量1"] + " × " + top_interactions["变量2"]
    plt.figure(figsize=(11, 7))
    ax = sns.barplot(data=top_interactions, x="q交互", y="组合", hue="交互类型", dodge=False, palette="Set2")
    for container in ax.containers:
        ax.bar_label(container, fmt="%.3f", padding=3, fontsize=9)
    ax.set_title("Top 12 交互解释力组合", fontsize=18, pad=14)
    ax.set_xlabel("q 交互值")
    ax.set_ylabel("")
    ax.grid(axis="x", linestyle="--", alpha=0.25)
    ax.legend(title="交互类型", loc="lower right")
    save_fig("04_Top交互组合.png")

    acc_plot = accuracy.sort_values("LOOCV_R2", ascending=True)
    plt.figure(figsize=(10, 6))
    plt.hlines(acc_plot["模型"], 0, acc_plot["训练R2"], color="#d7e3ef", linewidth=8, label="训练R2")
    plt.scatter(acc_plot["训练R2"], acc_plot["模型"], s=100, color="#377eb8", label="训练R2")
    plt.scatter(acc_plot["LOOCV_R2"], acc_plot["模型"], s=120, color="#e41a1c", label="LOOCV_R2")
    for _, row in acc_plot.iterrows():
        plt.text(row["LOOCV_R2"] + 0.015, row["模型"], f"{row['LOOCV_R2']:.3f}", va="center", fontsize=9)
    plt.axvline(0, color="black", linewidth=0.8)
    plt.title("模型精度对比:训练集 vs 留一交叉验证", fontsize=18, pad=14)
    plt.xlabel("R2")
    plt.ylabel("")
    plt.legend()
    plt.grid(axis="x", linestyle="--", alpha=0.25)
    save_fig("05_模型精度对比.png")

    plt.figure(figsize=(7.5, 7))
    sns.regplot(data=best_pred_table, x=TARGET_COL, y="预测值", scatter_kws={"s": 70, "alpha": 0.85}, line_kws={"color": "#e41a1c"})
    min_val = min(best_pred_table[TARGET_COL].min(), best_pred_table["预测值"].min())
    max_val = max(best_pred_table[TARGET_COL].max(), best_pred_table["预测值"].max())
    plt.plot([min_val, max_val], [min_val, max_val], "--", color="gray", linewidth=1)
    best_model = best_pred_table["最优模型"].iloc[0]
    train_r2 = r2_score(best_pred_table[TARGET_COL], best_pred_table["预测值"])
    cv_r2 = r2_score(best_pred_table[TARGET_COL], best_pred_table["LOOCV预测值"])
    plt.title(f"{best_model} 拟合效果:训练R2={train_r2:.3f},LOOCV_R2={cv_r2:.3f}", fontsize=15, pad=14)
    plt.xlabel("真实值")
    plt.ylabel("预测值")
    save_fig("06_最优模型真实值_vs_预测值.png")

    plt.figure(figsize=(9, 5.5))
    sns.histplot(best_pred_table["LOOCV残差"], kde=True, color="#4c78a8", bins=10)
    plt.axvline(0, color="#e41a1c", linestyle="--", linewidth=1.2)
    plt.title("最优模型 LOOCV 残差分布", fontsize=18, pad=14)
    plt.xlabel("LOOCV 残差")
    plt.ylabel("频数")
    save_fig("07_最优模型残差分布.png")

    if feature_importance is not None:
        plt.figure(figsize=(9, 6))
        ax = sns.barplot(data=feature_importance, x="重要性", y="变量", palette="mako")
        for container in ax.containers:
            ax.bar_label(container, fmt="%.3f", padding=3, fontsize=9)
        ax.set_title("最优预测模型变量重要性", fontsize=18, pad=14)
        ax.set_xlabel("重要性")
        ax.set_ylabel("")
        ax.grid(axis="x", linestyle="--", alpha=0.25)
        save_fig("08_最优模型变量重要性.png")

    corr_df = result_data[FEATURE_COLS + [TARGET_COL]].rename(columns={**FEATURE_CN_MAP, TARGET_COL: TARGET_CN_NAME}).corr()
    plt.figure(figsize=(9, 8))
    sns.heatmap(corr_df, annot=True, fmt=".2f", cmap="vlag", center=0, square=True, linewidths=0.8, linecolor="white")
    plt.title("变量相关性热力图", fontsize=18, pad=14)
    plt.xticks(rotation=35, ha="right")
    plt.yticks(rotation=0)
    save_fig("09_变量相关性热力图.png")

    coef_cols = [f"{FEATURE_CN_MAP[col]}_局部系数" for col in FEATURE_COLS]
    coef_long = result_data[[CITY_FIELD] + coef_cols].melt(id_vars=CITY_FIELD, var_name="变量", value_name="局部系数")
    coef_long["变量"] = coef_long["变量"].str.replace("_局部系数", "", regex=False)
    plt.figure(figsize=(11, 6.5))
    sns.boxplot(data=coef_long, x="局部系数", y="变量", palette="coolwarm")
    plt.axvline(0, color="black", linestyle="--", linewidth=1)
    plt.title("MGWR 局部系数分布箱线图", fontsize=18, pad=14)
    plt.xlabel("局部系数")
    plt.ylabel("")
    save_fig("10_MGWR局部系数箱线图.png")

    city_coef = result_data.set_index(CITY_FIELD)[coef_cols]
    city_coef.columns = [col.replace("_局部系数", "") for col in city_coef.columns]
    plt.figure(figsize=(9, max(8, len(city_coef) * 0.22)))
    sns.heatmap(city_coef, cmap="RdBu_r", center=0, linewidths=0.25, linecolor="white", cbar_kws={"label": "局部系数"})
    plt.title("城市-变量 MGWR 局部系数热力图", fontsize=18, pad=14)
    plt.xticks(rotation=35, ha="right")
    plt.yticks(fontsize=8)
    save_fig("11_城市变量局部系数热力图.png")


def export_tables(result_data, mgwr_metrics, bw_table, coef_summary, q_df, interaction_matrix, interaction_long, moran_df, accuracy, best_pred_table, feature_importance):
    print("\n导出 Excel 数据表...")
    out_xlsx = os.path.join(OUT_DIR, "MGWR_地理探测器_精度结果表.xlsx")

    with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
        result_data.to_excel(writer, sheet_name="城市预测残差与局部系数", index=False)
        mgwr_metrics.to_excel(writer, sheet_name="MGWR精度", index=False)
        bw_table.to_excel(writer, sheet_name="MGWR带宽", index=False)
        coef_summary.to_excel(writer, sheet_name="MGWR局部系数汇总", index=False)
        q_df.to_excel(writer, sheet_name="地理探测器q值", index=False)
        interaction_matrix.to_excel(writer, sheet_name="交互探测矩阵")
        interaction_long.to_excel(writer, sheet_name="交互探测排序", index=False)
        moran_df.to_excel(writer, sheet_name="Moran空间自相关", index=False)
        accuracy.to_excel(writer, sheet_name="模型精度对比", index=False)
        best_pred_table.to_excel(writer, sheet_name="最优模型预测表", index=False)
        if feature_importance is not None:
            feature_importance.to_excel(writer, sheet_name="最优模型变量重要性", index=False)

    q_df.to_csv(os.path.join(OUT_DIR, "地理探测器q值.csv"), index=False, encoding="utf-8-sig")
    interaction_matrix.to_csv(os.path.join(OUT_DIR, "交互探测矩阵.csv"), encoding="utf-8-sig")
    interaction_long.to_csv(os.path.join(OUT_DIR, "交互探测排序.csv"), index=False, encoding="utf-8-sig")
    accuracy.to_csv(os.path.join(OUT_DIR, "模型精度对比.csv"), index=False, encoding="utf-8-sig")
    result_data.to_csv(os.path.join(OUT_DIR, "城市预测残差与局部系数.csv"), index=False, encoding="utf-8-sig")

    return out_xlsx


def main():
    print("\n读取数据...")
    data = load_data(EXCEL_PATH)
    gdf = load_shp(SHP_PATH)
    data = fill_city_name_by_coords(data, gdf)
    print(data.head())

    result_data, mgwr_metrics, bw_table, coef_summary = run_mgwr(data)
    q_df, interaction_matrix, interaction_long = run_geodetector(data)
    moran_df = compute_moran(data, gdf)
    accuracy, best_pred_table, feature_importance = compare_prediction_models(data)

    make_charts(result_data, q_df, interaction_matrix, interaction_long, accuracy, best_pred_table, feature_importance)
    out_xlsx = export_tables(
        result_data,
        mgwr_metrics,
        bw_table,
        coef_summary,
        q_df,
        interaction_matrix,
        interaction_long,
        moran_df,
        accuracy,
        best_pred_table,
        feature_importance,
    )

    best_row = accuracy.iloc[0]
    mgwr_row = mgwr_metrics.iloc[0]

    print("\n===================================")
    print("分析完成:已取消地图输出,仅保留表格和非地图统计图。")
    print(f"MGWR 精度:R2={mgwr_row['训练R2']:.4f},调整R2={mgwr_row['调整R2']:.4f},RMSE={mgwr_row['RMSE']:.4f},MAE={mgwr_row['MAE']:.4f}")
    print(f"当前最优预测模型:{best_row['模型']},训练R2={best_row['训练R2']:.4f},LOOCV_R2={best_row['LOOCV_R2']:.4f},LOOCV_RMSE={best_row['LOOCV_RMSE']:.4f}")
    print(f"结果表:{out_xlsx}")
    print(f"输出目录:{OUT_DIR}")
    print("===================================")


if __name__ == "__main__":
    main()

成图为(由于数据敏感,故ai出图):

ChatGPT Image 2026年6月1日 13_31_42


文章来源:https://www.cnblogs.com/Laurentianelle/p/20249885
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云