Source code for prereise.gather.demanddata.bldg_electrification.ff_model

import os

import numpy as np
import pandas as pd
from scipy.optimize import least_squares

from prereise.gather.demanddata.bldg_electrification import const


[docs]def calculate_r2(endogenous, residuals): """Calculate r2 value of fit. :param iterable endogenous: vector of observations of endogenous variable. :param iterable residuals: vector of residuals between modeled fit and observations. :return: (*float*) -- r-squared value of fit. """ sumres = np.square(np.array(residuals)).sum() sumtot = np.square(np.array(endogenous) - np.array(endogenous).mean()).sum() r2 = 1 - (sumres / sumtot) return r2
[docs]def calculate_state_slopes(puma_data, year=const.base_year): """Estimate regression parameters per-state for residential and commercial fuel use. :param pandas.DataFrame puma_data: data frame of per-puma data. :param int/str year: year of data to use for analysis. :return: (*tuple*) -- a pair of pandas.DataFrame objects for per-state residential and commercial slopes, respectively. """ dti = pd.date_range(start=f"{year}-01-01", end=f"{year}-12-31 23:00:00", freq="H") hours_in_month = dti.month.value_counts() # Load in historical fossil fuel usage data for input/base year data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") ng_usage_data = { clas: pd.read_csv( os.path.join(data_dir, f"ng_monthly_mmbtu_{year}_{clas}.csv"), index_col=0 ) for clas in {"res", "com"} } fok_usage_data = pd.read_csv( os.path.join(data_dir, f"fok_data_bystate_{year}.csv"), index_col="state" ) othergas_usage_data = pd.read_csv( os.path.join(data_dir, f"propane_data_bystate_{year}.csv"), index_col="state" ) # Initialize dataframes to store state heating slopes state_slopes_res = pd.DataFrame( columns=(["state", "r2", "sh_slope", "dhw_const", "dhw_slope", "other_const"]) ) state_slopes_com = pd.DataFrame( columns=( [ "state", "r2", "sh_slope", "dhw_const", "cook_const", "other_const", "other_slope", ] ) ) for state in const.state_list: # Load puma data puma_data_it = puma_data.query("state == @state") # Load puma temperatures temps_pumas = temps_pumas = pd.read_csv( f"https://besciences.blob.core.windows.net/datasets/bldg_el/pumas/{year}/temps/temps_pumas_{state}_{year}.csv" ) temps_pumas_transpose = temps_pumas.T for clas in const.classes: # puma area * percentage of puma area that uses fossil fuel areas_ff_sh_it = ( puma_data_it[f"{clas}_area_{year}_m2"] * puma_data_it[f"frac_ff_sh_{clas}_{year}"] ) areas_ff_dhw_it = ( puma_data_it[f"{clas}_area_{year}_m2"] * puma_data_it[f"frac_ff_dhw_{clas}_{year}"] ) areas_ff_cook_it = ( puma_data_it[f"{clas}_area_{year}_m2"] * puma_data_it[f"frac_ff_cook_com_{year}"] ) if clas == "res": areas_ff_other_it = ( puma_data_it[f"{clas}_area_{year}_m2"] * puma_data_it[f"frac_ff_other_res_{year}"] ) else: areas_ff_other_it = ( puma_data_it[f"{clas}_area_{year}_m2"] * puma_data_it[f"frac_ff_sh_com_{year}"] ) # sum of previous areas to be used in fitting sum_areaff_dhw = sum(areas_ff_dhw_it) sum_areaff_other = sum(areas_ff_other_it) sum_areaff_cook = sum(areas_ff_cook_it) # Load monthly natural gas usage for the state natgas = ng_usage_data[clas][state] # Load annual fuel oil/kerosene and other gas/propane usage for the state fok = fok_usage_data.loc[state, f"fok.{clas}.mmbtu"] other = othergas_usage_data.loc[state, f"propane.{clas}.mmbtu"] totfuel = fok + other + natgas.sum() # Scale total fossil fuel usage by monthly natural gas ff_usage_data_it = totfuel * natgas / natgas.sum() # Fossil fuel average monthly mmbtu, normalized by hours in month ff_monthly_it = ff_usage_data_it / hours_in_month # Hourly heating degrees for all pumas in a given state, multiplied by their corresponding area and percent fossil fuel, summed up to one hourly list hd_hourly_it_sh = ( temps_pumas_transpose.applymap( lambda x: max(const.temp_ref[clas] - x, 0) ) .mul(areas_ff_sh_it, axis=0) .sum(axis=0) ) hd_monthly_it_sh = hd_hourly_it_sh.groupby(dti.month).mean() if clas == "res": hd_hourly_it_dhw = ( temps_pumas_transpose.applymap(lambda x: const.temp_ref[clas] - x) .mul(areas_ff_dhw_it, axis=0) .sum(axis=0) ) hd_monthly_it_dhw = hd_hourly_it_dhw.groupby(dti.month).mean() # Fitting function: Returns difference between fitted equation and actual fossil fuel usage for the least_squares function to minimize def func_r(par, sh, dhw, ff): err = hours_in_month ** (1 / 2) * ( ff - ( par[0] * sh + par[1] * (sum_areaff_dhw + const.dhw_lin_scalar * dhw) + par[2] * sum_areaff_other ) ) return err # Least squares solver lm_it = least_squares( func_r, const.bounds_lower_res, args=(hd_monthly_it_sh, hd_monthly_it_dhw, ff_monthly_it), bounds=(const.bounds_lower_res, const.bounds_upper_res), ) # Solved coefficients for slopes and constants par_sh_l = lm_it.x[0] par_dhw_c = lm_it.x[1] par_dhw_l = lm_it.x[1] * const.dhw_lin_scalar par_other_c = lm_it.x[2] corrected_residuals = np.array(lm_it.fun) / hours_in_month ** (1 / 2) r2 = calculate_r2(ff_monthly_it, corrected_residuals) # Add coefficients to output dataframe df_i = len(state_slopes_res) state_slopes_res.loc[df_i] = [ state, r2, par_sh_l, par_dhw_c, par_dhw_l, par_other_c, ] else: hd_hourly_it_other = ( temps_pumas_transpose.applymap( lambda x: max(x - const.temp_ref[clas], 0) ) .mul(areas_ff_other_it, axis=0) .sum(axis=0) ) hd_monthly_it_other = hd_hourly_it_other.groupby(dti.month).mean() bound_lower_consts_par = ( const.dhw_low_bound_com * sum_areaff_dhw + const.cook_c_scalar * const.dhw_low_bound_com * sum_areaff_cook ) / (sum_areaff_dhw + sum_areaff_cook + sum_areaff_other) bound_upper_consts_par = ( const.dhw_high_bound_com * sum_areaff_dhw + const.cook_c_scalar * const.dhw_high_bound_com * sum_areaff_cook + const.other_high_bound_com * sum_areaff_other ) / (sum_areaff_dhw + sum_areaff_cook + sum_areaff_other) bounds_lower_com = [0, bound_lower_consts_par, 0] bounds_upper_com = [np.inf, bound_upper_consts_par, np.inf] # Fitting function: Returns difference between fitted equation and actual fossil fuel usage for the least_squares function to minimize def func_c(par, sh, other, ff): err = hours_in_month ** (1 / 2) * ( ff - ( par[0] * sh + par[1] * (sum_areaff_dhw + sum_areaff_cook + sum_areaff_other) + par[2] * other ) ) return err # Least squares solver lm_it = least_squares( func_c, bounds_lower_com, args=(hd_monthly_it_sh, hd_monthly_it_other, ff_monthly_it), bounds=(bounds_lower_com, bounds_upper_com), ) # Solved dhw/cook/other constants consts_par = lm_it.x[1] bound_decision_point = ( consts_par * (sum_areaff_dhw + sum_areaff_cook + sum_areaff_other) / (sum_areaff_dhw + const.cook_c_scalar * sum_areaff_cook) ) if bound_decision_point <= const.dhw_high_bound_com: par_dhw_c = bound_decision_point par_other_c = 0 else: par_dhw_c = const.dhw_high_bound_com par_other_c = ( consts_par * (sum_areaff_dhw + sum_areaff_cook + sum_areaff_other) - ( const.dhw_high_bound_com * sum_areaff_dhw + const.cook_c_scalar * const.dhw_high_bound_com * sum_areaff_cook ) ) / sum_areaff_other par_cook_c = const.cook_c_scalar * par_dhw_c # Solved coefficients for slopes par_sh_l = lm_it.x[0] par_other_l = lm_it.x[2] corrected_residuals = np.array(lm_it.fun) / hours_in_month ** (1 / 2) r2 = calculate_r2(ff_monthly_it, corrected_residuals) # Add coefficients to output dataframe df_i = len(state_slopes_com) state_slopes_com.loc[df_i] = [ state, r2, par_sh_l, par_dhw_c, par_cook_c, par_other_c, par_other_l, ] # Export heating/hot water/cooking coefficients for each state return state_slopes_res, state_slopes_com
[docs]def adjust_puma_slopes( puma_data, state_slopes_res, state_slopes_com, year=const.base_year ): """Create per-puma slopes from per-state slopes. :param pandas.DataFrame puma_data: puma data. :param pandas.DataFrame state_slopes_res: residential state slopes. :param pandas.DataFrame state_slopes_com: commercial state slopes. :param int year: year of temperatures to download. :return: (*tuple*) -- a pair of pandas.DataFrame objects for per-puma residential and commercial slopes, respectively. """ # Minimize error between actual slopes and fitted function # Note for fitting to converge, hdd must be divided by 1000 and slopes in btu def model(par, hdd_div1000, slope_btu): err = ( slope_btu - (par[0] + par[1] * (1 - np.exp(-par[2] * hdd_div1000))) / hdd_div1000 ) return err # Functions with solved coefficients for res and com - produces slopes in btu/m2-C for inputs of HDD def func_slope_exp(x, a, b, c): return (a + b * (1 - np.exp(-c * (x / 1000))) / (x / 1000)) * 1e-6 classes = ["res", "com"] hd_col_names = {"res": f"hd_183C_{year}", "com": f"hd_167C_{year}"} state_slopes = { "res": state_slopes_res.set_index("state"), "com": state_slopes_com.set_index("state"), } puma_slopes = {clas: puma_data["state"].to_frame() for clas in classes} # Create data frames to hold output adj_slopes = {clas: puma_data["state"].to_frame() for clas in classes} for state in const.state_list: # Load puma temperatures temps_pumas = pd.read_csv( f"https://besciences.blob.core.windows.net/datasets/bldg_el/pumas/{year}/temps/temps_pumas_{state}_{year}.csv" ) # Hourly temperature difference below const.temp_ref_res/com for each puma for clas in classes: temp_diff = temps_pumas.applymap(lambda x: max(const.temp_ref[clas] - x, 0)) puma_data.loc[temp_diff.columns, hd_col_names[clas]] = temp_diff.sum() # Load in state groups consistent with building area scale adjustments data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") area_scale = { clas: pd.read_csv( os.path.join(data_dir, f"area_scale_{clas}.csv"), index_col=False ) for clas in classes } # Compute target year areas from the two survey years provided area_scale["res"][f"{year}"] = area_scale["res"][f"RECS{const.recs_date_1}"] * ( ( area_scale["res"][f"RECS{const.recs_date_2}"] / area_scale["res"][f"RECS{const.recs_date_1}"] ) ** ( (const.base_year - const.recs_date_1) / (const.recs_date_2 - const.recs_date_1) ) ) area_scale["com"][f"{year}"] = area_scale["com"][f"CBECS{const.cbecs_date_1}"] * ( ( area_scale["com"][f"CBECS{const.cbecs_date_2}"] / area_scale["com"][f"CBECS{const.cbecs_date_1}"] ) ** ( (const.base_year - const.cbecs_date_1) / (const.cbecs_date_2 - const.cbecs_date_1) ) ) for clas in classes: puma_slopes[clas]["htg_slope_mmbtu_m2_degC"] = puma_data["state"].map( state_slopes[clas]["sh_slope"] ) # Extract state groups from area_scale state_to_group = { elem: i for i, row in area_scale[clas].iterrows() for elem in row if isinstance(elem, str) } # Calculate population-weighted HDD and slopes state_puma_groupby = puma_data.groupby(puma_data["state"].map(state_to_group)) state_puma_slope_groupby = puma_slopes[clas].groupby( puma_data["state"].map(state_to_group) ) area_scale[clas]["hdd_normals_popwtd"] = [ (sum(data["hdd65_normals"] * data["pop"]) / data["pop"].sum()) for group, data in state_puma_groupby ] area_scale[clas]["htg_slope_mmbtu_m2_degC_pophddwtd"] = [ sum( state_puma_slope_groupby.get_group(group)["htg_slope_mmbtu_m2_degC"] * data["hdd65_normals"] * data["pop"] ) / sum(data["hdd65_normals"] * data["pop"]) for group, data in state_puma_groupby ] ls_args = ( # Divide by 1000 for robust solver np.array(area_scale[clas]["hdd_normals_popwtd"]) / 1000, np.array(area_scale[clas]["htg_slope_mmbtu_m2_degC_pophddwtd"]) * 10**6, ) ls = least_squares( model, [35, 35, 0.8], args=ls_args, method="trf", loss="soft_l1", f_scale=0.1, bounds=(0, [100, 100, 1]), ) slope_scalar = state_slopes[clas]["sh_slope"] / ( ( puma_data["hdd65_normals"].apply( func_slope_exp, args=(ls.x[0], ls.x[1], ls.x[2]) ) * puma_data[hd_col_names[clas]] * puma_data[f"{clas}_area_{year}_m2"] * puma_data[f"frac_ff_sh_{clas}_{year}"] ) .groupby(puma_data["state"]) .sum() / ( puma_data[hd_col_names[clas]] * puma_data[f"{clas}_area_{year}_m2"] * puma_data[f"frac_ff_sh_{clas}_{year}"] ) .groupby(puma_data["state"]) .sum() ) adj_slopes[clas][f"htg_slope_{clas}_mmbtu_m2_degC"] = puma_data["state"].map( slope_scalar ) * puma_data["hdd65_normals"].apply( func_slope_exp, args=(ls.x[0], ls.x[1], ls.x[2]) ) return adj_slopes["res"], adj_slopes["com"]
if __name__ == "__main__": data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") # Calculate and save state slopes state_slopes_res, state_slopes_com = calculate_state_slopes(const.puma_data) state_slopes_res.to_csv( os.path.join(data_dir, "state_slopes_ff_res.csv"), index=False ) state_slopes_com.to_csv( os.path.join(data_dir, "state_slopes_ff_com.csv"), index=False ) # Calculate and save puma slopes adj_slopes_res, adj_slopes_com = adjust_puma_slopes( const.puma_data, state_slopes_res, state_slopes_com ) adj_slopes_res.to_csv(os.path.join(data_dir, "puma_slopes_ff_res.csv")) adj_slopes_com.to_csv(os.path.join(data_dir, "puma_slopes_ff_com.csv"))