import os
import geopandas as gpd
import numpy as np
import pandas as pd
import statsmodels.api as sm
from prereise.gather.demanddata.bldg_electrification import const
from prereise.gather.demanddata.bldg_electrification.helper import read_shapefile
[docs]def puma_county_shp_overlay(pumas_shp, county_shp):
"""Align all counties that are within survey areas of AHS(American Housing Survey)
to PUMAs geographically
:param geopandas.GeoDataFrame pumas_shp: geo data frame of pumas shape file
:param geopandas.GeoDataFrame county_shp: geo data frame of county shape file
:return (*pandas.DataFrame*) -- puma data of all pumas within the survey areas of
AHS, including fraction within the zone. Row indices are id's of puma that is
covered by AHS survey data. Columns include household heat pump possession
rate from AHS, fraction of geographical area each PUMA in each county,
and other end-use energy information in "/data/puma_data.csv"
"""
county_shp.index = county_shp["GEOID"]
county_shp.sort_index(inplace=True)
pumas_shp["area"] = pumas_shp["geometry"].to_crs({"proj": "cea"}).area
# read AHS metropolitan data on HP
hp_metro_ahs = pd.read_excel(
os.path.join(os.path.dirname("__file__"), "data", "hp_ahs.xlsx"), dtype=str
)
hp_metro_ahs["counties"] = (
hp_metro_ahs.filter(like="county").stack().groupby(level=0).apply(list).tolist()
)
for i in range(len(hp_metro_ahs)):
county_shp.loc[
county_shp.index.isin(hp_metro_ahs.loc[i, "counties"]), "metro_id"
] = i
county_shp.loc[
county_shp.index.isin(hp_metro_ahs.loc[i, "counties"]), "HP rate"
] = hp_metro_ahs.loc[i, "HP rate"]
county_shp = county_shp[~county_shp["metro_id"].isna()]
county_shp = county_shp.copy()
county_shp["HP rate"] = county_shp["HP rate"].astype("float64")
county_shp["metro_id"] = county_shp["metro_id"].astype("int64")
county_shp = county_shp.dissolve(by="metro_id")
county_shp["metro_id"] = county_shp.index
puma_metro = gpd.overlay(pumas_shp, county_shp, keep_geom_type=True)
puma_metro["area"] = puma_metro["geometry"].to_crs({"proj": "cea"}).area
puma_metro["area_frac"] = [
puma_metro["area"][i]
/ list(pumas_shp[pumas_shp["puma"] == puma_metro["puma"][i]]["area"])[0]
for i in range(len(puma_metro))
]
puma_metro = puma_metro.drop(puma_metro[puma_metro["area_frac"] <= 0.01].index)
puma_data_metro = pd.DataFrame(
{
"puma": puma_metro["puma"],
"frac_in_zone": puma_metro["area_frac"],
"metro_id": puma_metro["metro_id"],
"hp_penetration": puma_metro["HP rate"],
}
)
puma_data = const.puma_data
puma_data_metro = puma_data_metro.join(puma_data, on="puma")
puma_data_metro = puma_data_metro.set_index("puma")
ng_price = pd.read_csv(
os.path.join(
os.path.dirname(__file__), "data", "state_ng_residential_price_2019.csv"
),
index_col=0,
)
puma_data_metro["ng_price"] = puma_data_metro.apply(
lambda x: ng_price.loc[x.loc["state"], "$/kWh"], axis=1
)
return puma_data_metro
[docs]def metro_data(puma_data):
"""Aggregate puma metrics to the population weighted metropolitan area values of AHS
survey
:param pandas.DataFrame puma_data: puma data within metropolitan areas, output of
:py:func: `puma_county_shp_overlay()`
:return (*pandas.DataFrame*) stats -- metropolitan area data to be used for heat
pump penetration linear regression model. The row indices includes all
metropolitan areas with AHS heat pump data. Columns include data on the fraction
of floor area that use electricity for space heating, average natural gas
price for year 2019, normal year HDD, and AHS heat pump penetration rate.
"""
puma_fuel = const.puma_fuel
stats = pd.DataFrame()
for id, puma_df in puma_data.groupby("metro_id"):
puma_fuel_metro = puma_fuel[puma_fuel.index.isin(puma_df.index)]
puma_pop_weights = (puma_df["pop"] * puma_df["frac_in_zone"]) / sum(
puma_df["pop"] * puma_df["frac_in_zone"]
)
stats.loc[id, "frac_elec_htg"] = puma_fuel_metro["hh_elec"].mul(
puma_pop_weights, axis=0
).sum(axis=0) / puma_fuel_metro["hh_total"].mul(puma_pop_weights, axis=0).sum(
axis=0
)
stats.loc[id, "frac_elec_htg_sq"] = stats.loc[id, "frac_elec_htg"] ** 2
stats.loc[id, "log_hdd"] = np.log(
(puma_df["hdd65_normals"] * puma_pop_weights).sum()
)
stats.loc[id, "natgas_price"] = (
puma_data["ng_price"].mul(puma_pop_weights, axis=0).sum(axis=0)
)
stats.loc[id, "hp_penetration"] = puma_df["hp_penetration"][0]
return stats
[docs]def hp_penetration_fit(stats):
"""Fit current heat pump penetration rate on metropolitan area data
:param pandas.DataFrame stats: metropolitan area data, output of
:py:func: `metro_data()`
:return: (*tuple*) -- hp_fit_df: slope and intercept of heat pump penetration
regression model, fit_stats: fitting statistics of heat pump penetration
regression model; hp_fit_df_low_elec: slope and intercept of a simpler heat
pump penetration model fitted on data points with low electrification rate
of buildings; fit_stats_low_elec: fitting statistics of a simpler heat pump
penetration model fitted on data points with low electrification rate of
buildings.
"""
lm_hp = sm.OLS(
stats["hp_penetration"],
sm.add_constant(
np.array(
stats[
[
"frac_elec_htg",
"log_hdd",
"natgas_price",
]
]
)
),
).fit()
print(
"fitting statistics summary of heat pump penetration model:", lm_hp.summary()
) # check the fitting statistics
hp_fit_df = pd.Series(
data=[
lm_hp.params[0],
lm_hp.params[1],
lm_hp.params[2],
lm_hp.params[3],
],
index=[
"const",
"s_frac_elec_htg",
"s_log_hdd",
"s_natgas_price",
],
)
fit_stats = pd.Series(
data=[
lm_hp.bse[0],
lm_hp.bse[1],
lm_hp.bse[2],
lm_hp.bse[3],
lm_hp.pvalues[1],
lm_hp.pvalues[2],
lm_hp.pvalues[3],
lm_hp.rsquared,
],
index=[
"stderr_const",
"stderr_frac_elec_htg",
"stderr_log_hdd",
"stderr_natgas_price",
"pvalue_frac_elec_htg",
"pvalue_log_hdd",
"pvalue_natgas_price",
"rsquared",
],
)
# It's observed that hp penetration has low variance when electric heating
# rate is low, and high variance when electric heating rate is high. So fit a
# simpler model with less variance for pumas with lower electric heating
# penetration as follows
lm_hp_low = sm.OLS(
stats["hp_penetration"],
sm.add_constant(
np.array(
stats[
[
"frac_elec_htg",
"frac_elec_htg_sq",
]
]
)
),
).fit()
hp_fit_df_low_elec = pd.Series(
data=[
lm_hp_low.params[0],
lm_hp_low.params[1],
lm_hp_low.params[2],
],
index=[
"const",
"s_frac_elec_htg",
"s_frac_elec_htg_sq",
],
)
fit_stats_low_elec = pd.Series(
data=[
lm_hp_low.bse[0],
lm_hp_low.bse[1],
lm_hp_low.bse[2],
lm_hp_low.pvalues[1],
lm_hp_low.pvalues[2],
lm_hp_low.rsquared,
],
index=[
"stderr_const",
"stderr_frac_elec_htg",
"stderr_frac_elec_htg_sq",
"pvalue_frac_elec_htg",
"pvalue_frac_elec_htg_sq",
"rsquared",
],
)
return hp_fit_df, fit_stats, hp_fit_df_low_elec, fit_stats_low_elec
[docs]def estimate_puma_hp_penetration(hp_fit_df, hp_fit_df_low_elec, puma_data_metro):
"""Compute current heat pump penetration for PUMAs, and then scale the values to
align with regional inputs from RECS and CBECS surveys. When compute heat pump
penetration rate from the regression model, the simpler model with less variance
is chosen when the fraction of electrified heating is less than 0.27 (by default)
as observed from the relationship between current heat pump penetration rate and
the fraction of electrified heating
:param pandas.DataFrame hp_fit_df: slope and intercept of heat pump penetration
regression model, output of :py:func:`hp_penetration_fit()`
:param pandas.DataFrame hp_fit_df_low_elec: slope and intercept of heat pump
penetration regression model for low electrified locations, output of
:py:func: `hp_penetration_fit()`
:param pandas.DataFrame puma_data_metro: puma data within metropolitan areas,
output of :py:func: `puma_county_shp_overlay()`
:return: (*pandas.DataFrame*) hp_puma_df: current heat pump penetration estimations
for pumas that has been scaled based on RECS and CBECS surveys. The indices
of this dataframe includes all PUMA id's in the CONUS. The columns include
estimates of current heat pump penetration result for residential and commercial
building stock
"""
puma_fuel = const.puma_fuel
puma_data = const.puma_data
natgas_price = pd.read_csv(
os.path.join(
os.path.dirname(__file__), "data", "state_ng_residential_price_2019.csv"
),
index_col=0,
)
puma_hp_df = pd.DataFrame(
{
"state": puma_fuel["state"],
"frac_elec_htg": puma_fuel["hh_elec"] / puma_fuel["hh_total"],
"log_hdd": np.log(puma_data["hdd65_normals"]),
"natgas_price": puma_fuel.apply(
lambda x: natgas_price.loc[x.loc["state"], "$/kWh"], axis=1
),
"hh_total": puma_fuel["hh_total"],
},
index=puma_data.index,
)
puma_hp_df["frac_hp_fitted"] = puma_hp_df.apply(
lambda x: (
x["frac_elec_htg"] * hp_fit_df["s_frac_elec_htg"]
+ x["log_hdd"] * hp_fit_df["s_log_hdd"]
+ x["natgas_price"] * hp_fit_df["s_natgas_price"]
+ hp_fit_df["const"]
)
if x["frac_elec_htg"] > 0.27
else (
x["frac_elec_htg"] * hp_fit_df_low_elec["s_frac_elec_htg"]
+ x["frac_elec_htg"] ** 2 * hp_fit_df_low_elec["s_frac_elec_htg_sq"]
+ hp_fit_df_low_elec["const"]
),
axis=1,
) # 0.27 is an empirical threshold
puma_hp_df["frac_hp_fitted"].clip(lower=0, inplace=True)
puma_hp_df["hh_hp"] = puma_hp_df["frac_hp_fitted"] * puma_hp_df["hh_total"]
# scale hp penetration to align with AHS metropolitan area data
puma_data_metro["frac_hp_fitted"] = puma_hp_df["frac_hp_fitted"]
puma_data_metro["hh_total"] = puma_hp_df["hh_total"]
puma_data_metro["hh_hp_fitted"] = (
puma_data_metro["frac_hp_fitted"] * puma_data_metro["hh_total"]
)
puma_data_metro["hh_hp_target"] = (
puma_data_metro["hp_penetration"] * puma_data_metro["hh_total"]
)
puma_metro = puma_data_metro.groupby("metro_id")[
["hh_hp_fitted", "hh_hp_target"]
].sum()
scaler = puma_metro["hh_hp_target"] / puma_metro["hh_hp_fitted"]
# if a puma is in two metropolitan area, then scale it by the metro area that
# is larger overlaying with the puma
puma_data_metro.sort_values("frac_in_zone", ascending=False, inplace=True)
puma_data_metro = puma_data_metro[~puma_data_metro.index.duplicated(keep="first")]
puma_data_metro = puma_data_metro.sort_index()
puma_data_metro["frac_hp_scaled"] = puma_data_metro.apply(
lambda x: x.loc["frac_hp_fitted"] * scaler[x.loc["metro_id"]], axis=1
)
puma_hp_df.loc[
puma_hp_df.index.isin(puma_data_metro.index), "frac_hp_fitted"
] = puma_data_metro["frac_hp_scaled"]
puma_hp_df["hh_hp"] = puma_hp_df["frac_hp_fitted"] * puma_hp_df["hh_total"]
# scale hp penetration to align with RECS and CBECS survey
for clas in const.classes:
hp_target = pd.read_csv(
os.path.join(
os.path.dirname(__file__), "data", f"frac_target_hp_{clas}.csv"
),
index_col=0,
)
puma_hp_df[f"census_region_{clas}"] = puma_hp_df.apply(
lambda x: [
i
for i in const.recs_cbecs_census_region[clas]
if x["state"] in const.recs_cbecs_census_region[clas][i]
][0],
axis=1,
)
census_df = puma_hp_df.groupby(f"census_region_{clas}")[
["hh_hp", "hh_total"]
].sum()
census_df["hh_hp_target"] = hp_target["hp_target"] * census_df["hh_total"]
scaler = census_df["hh_hp_target"] / census_df["hh_hp"]
puma_hp_df[f"frac_hp_{clas}"] = puma_hp_df.apply(
lambda x: (x["frac_hp_fitted"] * scaler[x[f"census_region_{clas}"]]), axis=1
)
puma_hp_df[f"hh_hp_{clas}"] = (
puma_hp_df[f"frac_hp_{clas}"] * puma_hp_df["hh_total"]
)
puma_hp_df[f"frac_hp_{clas}"].clip(
upper=puma_hp_df["frac_elec_htg"], inplace=True
)
hp_puma_df = pd.DataFrame(
{
"state": puma_hp_df["state"],
"frac_hp_res": puma_hp_df["frac_hp_res"],
"frac_hp_com": puma_hp_df["frac_hp_com"],
}
)
return hp_puma_df
if __name__ == "__main__":
pumas_shp = read_shapefile(
"https://besciences.blob.core.windows.net/shapefiles/USA/pumas-overlay/pumas_overlay.zip"
)
county_shp = read_shapefile(
"https://besciences.blob.core.windows.net/shapefiles/USA/county-outlines/county_area.zip"
)
puma_data_metro = puma_county_shp_overlay(pumas_shp, county_shp)
stats = metro_data(puma_data_metro)
hp_fit_df, fit_stats, hp_fit_df_low_elec, fit_stats_low_elec = hp_penetration_fit(
stats
)
hp_puma_df = estimate_puma_hp_penetration(
hp_fit_df, hp_fit_df_low_elec, puma_data_metro
)
hp_puma_df.to_csv(
os.path.join(os.path.dirname("__file__"), "data", "puma_hp_data.csv")
)