Source code for prereise.gather.flexibilitydata.doe.geo_data

import os
import pickle as pkl
import urllib.request

import numpy as np
import pandas as pd


[docs]def get_census_data(download_path): """Download county population data from USA Census website :param str download_path: folder to store the downloaded file """ os.makedirs(download_path, exist_ok=True) census_file_link = "https://www2.census.gov/programs-surveys/popest/datasets/2010-2020/counties/totals/co-est2020-alldata.csv" urllib.request.urlretrieve( census_file_link, os.path.join(download_path, "county_population.csv") )
[docs]def get_crosswalk_data(download_path): """Download FIPS-ZIP crosswalk data from USPS and convert to csv :param str download_path: folder to store the downloaded file """ # download os.makedirs(download_path, exist_ok=True) usps_file_link = ( "https://www.huduser.gov/portal/datasets/usps/COUNTY_ZIP_122021.xlsx" ) urllib.request.urlretrieve( usps_file_link, os.path.join(download_path, "county_to_zip.xlsx") ) # convert to csv usps_df = pd.read_excel(os.path.join(download_path, "county_to_zip.xlsx")) usps_df = usps_df.sort_values(by=["county", "zip"]) usps_df.to_csv(os.path.join(download_path, "county_to_zip.csv"), index=False) os.remove(os.path.join(download_path, "county_to_zip.xlsx"))
[docs]def get_lse_region_data(download_path): """Download LSE service region data :param str download_path: folder to store the downloaded file """ os.makedirs(download_path, exist_ok=True) iou_file_link = "https://data.openei.org/files/4042/iou_zipcodes_2019.csv" niou_file_link = "https://data.openei.org/files/4042/non_iou_zipcodes_2019.csv" urllib.request.urlretrieve( iou_file_link, os.path.join(download_path, "iou_zipcodes_2019.csv") ) urllib.request.urlretrieve( niou_file_link, os.path.join(download_path, "non_iou_zipcodes_2019.csv") )
[docs]def get_county_fips_data(download_path): """Download county FIPS data :param str download_path: folder to store the downloaded file """ os.makedirs(download_path, exist_ok=True) county_fips_link = ( "https://github.com/kjhealy/fips-codes/raw/master/county_fips_master.csv" ) urllib.request.urlretrieve( county_fips_link, os.path.join(download_path, "county_fips_master.csv") )
[docs]def zip_to_eiaid(raw_data_path, cache_path): """Find the LSE of all zip codes listed in the EIA LSE service regoin data :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(raw_data_path, "iou_zipcodes_2019.csv") ), "Input file iou_zipcodes_2019.csv does not exist." assert os.path.isfile( os.path.join(raw_data_path, "non_iou_zipcodes_2019.csv") ), "Input file non_iou_zipcodes_2019.csv does not exist." iou_df = pd.read_csv(os.path.join(raw_data_path, "iou_zipcodes_2019.csv")) niou_df = pd.read_csv(os.path.join(raw_data_path, "non_iou_zipcodes_2019.csv")) all_zips = set(iou_df["zip"].to_list() + niou_df["zip"].to_list()) zip2id = {} for i in all_zips: ioulses = iou_df.loc[iou_df["zip"] == i]["eiaid"].to_list() nioulses = niou_df.loc[niou_df["zip"] == i]["eiaid"].to_list() alllses = set(ioulses + nioulses) zip2id[i] = alllses with open(os.path.join(cache_path, "zip2eiaid.pkl"), "wb") as fh: pkl.dump(zip2id, fh)
[docs]def fips_to_eiaid(raw_data_path, cache_path): """Find the corresponding LSE for all counties identified by their FIPS number :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(cache_path, "zip2eiaid.pkl") ), "Cached file eiaid2zip.pkl does not exist." assert os.path.isfile( os.path.join(cache_path, "zip2fips.pkl") ), "Cached file zip2fips.pkl does not exist." assert os.path.isfile( os.path.join(cache_path, "fips_population.pkl") ), "Cached file fips_population.pkl does not exist." # load cached data with open(os.path.join(cache_path, "zip2eiaid.pkl"), "rb") as fh: zip2eiaid = pkl.load(fh) with open(os.path.join(cache_path, "zip2fips.pkl"), "rb") as fh: zip2fips = pkl.load(fh) zip_keys = list(zip2eiaid.keys()) zip_num = len(zip_keys) fips2eiaid = {} for i in range(zip_num): key = zip_keys[i] lses = zip2eiaid[key] # all fips for this zip try: fipss, weights = zip2fips[key] except Exception: continue # map to eiaid fips_num = len(fipss) for j in range(fips_num): # append to dictionary if fipss[j] in fips2eiaid.keys(): fips2eiaid[fipss[j]].update(lses) else: fips2eiaid[fipss[j]] = lses with open(os.path.join(cache_path, "fips2eiaid.pkl"), "wb") as fh: pkl.dump(fips2eiaid, fh)
[docs]def eiaid_to_zip(raw_data_path, cache_path): """Find the service region (list of ZIP codes) for every LSE identified by their EIA ID Create a dictionary with EIA ID as keys for list of zip codes in the cache folder :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(raw_data_path, "iou_zipcodes_2019.csv") ), "Input file iou_zipcodes_2019.csv does not exist." assert os.path.isfile( os.path.join(raw_data_path, "non_iou_zipcodes_2019.csv") ), "Input file non_iou_zipcodes_2019.csv does not exist." iou_df = pd.read_csv(os.path.join(raw_data_path, "iou_zipcodes_2019.csv")) niou_df = pd.read_csv(os.path.join(raw_data_path, "non_iou_zipcodes_2019.csv")) all_ids = set(iou_df["eiaid"].to_list() + niou_df["eiaid"].to_list()) id2zip = {} for i in all_ids: iouzips = iou_df.loc[iou_df["eiaid"] == i]["zip"].to_list() niouzips = niou_df.loc[niou_df["eiaid"] == i]["zip"].to_list() allzips = set(iouzips + niouzips) id2zip[i] = allzips with open(os.path.join(cache_path, "eiaid2zip.pkl"), "wb") as fh: pkl.dump(id2zip, fh)
[docs]def eiaid_to_fips(raw_data_path, cache_path): """Find the service region (list of FIPS codes) for every LSE identified by their EIA ID Create a dictionary with EIA ID as keys for list of FIPS codes in the cache folder :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(cache_path, "eiaid2zip.pkl") ), "Cached file eiaid2zip.pkl does not exist." assert os.path.isfile( os.path.join(cache_path, "zip2fips.pkl") ), "Cached file zip2fips.pkl does not exist." assert os.path.isfile( os.path.join(cache_path, "fips_population.pkl") ), "Cached file fips_population.pkl does not exist." # load cached data with open(os.path.join(cache_path, "eiaid2zip.pkl"), "rb") as fh: eiaid2zip = pkl.load(fh) with open(os.path.join(cache_path, "zip2fips.pkl"), "rb") as fh: zip2fips = pkl.load(fh) with open(os.path.join(cache_path, "fips_population.pkl"), "rb") as fh: fips_pop_df = pkl.load(fh) eia_keys = list(eiaid2zip.keys()) eia_num = len(eia_keys) eiaid2fips = {} for i in range(eia_num): key = eia_keys[i] zips = eiaid2zip[key] all_fips = [] all_pops = [] # aggregate all zip codes for j in zips: try: fipss, weights = zip2fips[j] except Exception: continue fips_num = len(fipss) for k in range(fips_num): # total population in this fips total_pops = fips_pop_df.loc[ fips_pop_df["fips"] == fipss[k], "population" ].to_list()[0] if fipss[k] in all_fips: all_pops[all_fips.index(fipss[k])] += weights[k] else: all_fips.append(fipss[k]) all_pops.append(weights[k] * total_pops) eiaid2fips.update({key: [all_fips, all_pops]}) with open(os.path.join(cache_path, "eiaid2fips.pkl"), "wb") as fh: pkl.dump(eiaid2fips, fh)
[docs]def fips_zip_conversion(raw_data_path, cache_path): """Create a two-way mapping for all ZIP and FIPS in the crosswalk data save to dictionary files for future use :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(raw_data_path, "county_to_zip.csv") ), "Input file county_to_zip.csv does not exist." df_raw = pd.read_csv(os.path.join(raw_data_path, "county_to_zip.csv")) all_fips = df_raw["county"].astype("int32") all_zip = df_raw["zip"].astype("int32") all_weights = df_raw["tot_ratio"] # create zip -> counties mapping zip2fips = {} for i in pd.unique(all_zip): idx = all_zip.index[all_zip == i] cty = all_fips[idx].tolist() wgt = all_weights[idx].tolist() zip2fips.update({i: (cty, wgt)}) with open(os.path.join(cache_path, "zip2fips.pkl"), "wb") as fh: pkl.dump(zip2fips, fh) # create county -> zips mapping fips2zip = {} for i in pd.unique(all_fips): idx = all_fips.index[all_fips == i] zips = all_zip[idx].tolist() wgt = all_weights[idx].tolist() fips2zip.update({i: (zips, wgt)}) with open(os.path.join(cache_path, "fips2zip.pkl"), "wb") as fh: pkl.dump(fips2zip, fh)
[docs]def get_fips_population(raw_data_path, cache_path): """Match county population and county FIPS data to produce concise FIPS population save to a dictonary in cache folder with key being 5-digit FIPS codes. :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(raw_data_path, "county_population.csv") ), "Input file county_population.csv does not exist." assert os.path.isfile( os.path.join(raw_data_path, "county_fips_master.csv") ), "Input file county_fips_master.csv does not exist." cty_pop_df = pd.read_csv( os.path.join(raw_data_path, "county_population.csv"), encoding="cp1252" ) cty_name_df = pd.read_csv( os.path.join(raw_data_path, "county_fips_master.csv"), encoding="cp1252" ) pops = np.zeros(len(cty_name_df.index)) for cty in cty_name_df.index: name = cty_name_df["county_name"][cty] state = cty_name_df["state_name"][cty] tmp = cty_pop_df.loc[ (cty_pop_df["CTYNAME"] == name) & (cty_pop_df["STNAME"] == state), "POPESTIMATE2019", ] # if no data if len(tmp) == 0: pops[cty] = 0 elif len(tmp) == 1: pops[cty] = tmp else: pops[cty] = tmp.to_list()[0] cty_name_df["population"] = pops new_df = cty_name_df.loc[:, ["fips", "county_name", "population"]] with open(os.path.join(cache_path, "fips_population.pkl"), "wb") as fh: pkl.dump(new_df, fh)
[docs]def get_zip_population(raw_data_path, cache_path): """Compute population of each ZIP code using percentage share and FIPS population save to a dictonary in cache folder with key being zip codes. :param str raw_data_path: folder that contains downloaded raw data :param str cache_path: folder to store processed cache files """ assert os.path.isfile( os.path.join(cache_path, "fips_population.pkl") ), "Cached fips_population.pkl does not exist." assert os.path.isfile( os.path.join(cache_path, "zip2fips.pkl") ), "Cached file zip2fips.pkl does not exist." with open(os.path.join(cache_path, "fips_population.pkl"), "rb") as fh: fips_pop_df = pkl.load(fh) with open(os.path.join(cache_path, "zip2fips.pkl"), "rb") as fh: zip2fips = pkl.load(fh) zips = sorted(list(zip2fips.keys())) zip_num = len(zips) zip_pops = {} for i in range(zip_num): z = zips[i] fipss = zip2fips[z][0] pcts = zip2fips[z][1] zip_pops[z] = 0 for f in range(len(fipss)): tmp = fips_pop_df.loc[fips_pop_df["fips"] == fipss[f], "population"].values if len(tmp) > 0: zip_pops[z] += tmp[0] * pcts[f] with open(os.path.join(cache_path, "zip_population.pkl"), "wb") as fh: pkl.dump(zip_pops, fh)