Source code for prereise.gather.demanddata.eia.map_ba

import inspect
import json
import os

import pandas as pd
import requests
from powersimdata.utility.distance import find_closest_neighbor
from tqdm import tqdm

import prereise


[docs]def aggregate_ba_demand(demand, mapping): """Aggregate demand in BAs to regions as defined in the mapping dictionary :param pandas.DataFrame demand: demand profiles in BAs. :param dict mapping: dictionary mapping of BA columns to regions. :return: (*pandas.DataFrame*) -- aggregated demand profiles """ agg_demand = pd.DataFrame(index=demand.index) for key in mapping: mapping_bas = mapping[key] valid_columns = list(set(mapping_bas) & set(demand.columns)) if len(valid_columns) < len(mapping_bas): print() print("******************************") print(f"Missing BA columns for {key}!") print(f"Original columns: {mapping_bas}") print("******************************") print(f"{key} regional demand was summed from {valid_columns}") print() if demand[valid_columns].shape[1] > 1: agg_demand[key] = demand[valid_columns].sum(axis=1) else: agg_demand[key] = demand[valid_columns] return agg_demand
[docs]def get_demand_in_loadzone(agg_demand, bus_map): """Get demand in loadzones from aggregated demand of BA regions. :param pandas.DataFrame agg_demand: demand profiles as returned by :py:func:`aggregate_ba_demand` :param pandas.DataFrame bus_map: data frame used to map BA regions to load zones using real power demand weighting. :return: (*pandas.DataFrame*) -- data frame with demand columns according to load zone. """ ba_agg = bus_map[["BA", "Pd"]].groupby("BA").sum().rename(columns={"Pd": "PdTotal"}) ba_scaling_factor = bus_map.merge(ba_agg, left_on="BA", right_on="BA") ba_scaling_factor = ba_scaling_factor.assign( zone_scaling=lambda x: x["Pd"] / x["PdTotal"] ) ba_grouped_scaling = ba_scaling_factor.groupby(["BA", "zone_name"]).sum() zone_demand = pd.DataFrame(index=agg_demand.index) for ba_name in ba_grouped_scaling.index.unique(level="BA").to_list(): zone_scaling_df = ba_grouped_scaling.loc[ba_name] for zone_name in zone_scaling_df.index.get_level_values(0).to_list(): if zone_name in zone_demand.columns.to_list(): zone_demand[zone_name] += ( zone_scaling_df.loc[zone_name, "zone_scaling"] * agg_demand[ba_name] ) else: zone_demand[zone_name] = ( zone_scaling_df.loc[zone_name, "zone_scaling"] * agg_demand[ba_name] ) return zone_demand
[docs]def map_buses_to_county(bus_county_map): """Find the county in the U.S. territory that each bus in the query grid belongs to. :param pandas.DataFrame bus_county_map: data frame contains a list of entries with lat and long. :return: (*tuple*) -- first element is a data frame of counties that buses locate. Second element is a list of bus indices that no county matches. """ # api-endpoint url = "https://geo.fcc.gov/api/census/block/find" # defining a params dict for the parameters to be sent to the API bus_county_map.loc[:, "County"] = None bus_county_map.loc[:, "BA"] = None bus_no_county_match = [] for index, row in tqdm(bus_county_map.iterrows(), total=len(bus_county_map)): params = { "latitude": row["lat"], "longitude": row["lon"], "format": "json", "showall": True, } # sending get request and saving the response as response object r = requests.get(url=url, params=params).json() try: county_name = r["County"]["name"] + "__" + r["State"]["code"] bus_county_map.loc[index, "County"] = county_name except TypeError: bus_no_county_match.append(index) return bus_county_map, bus_no_county_match
[docs]def map_buses_to_ba(bus_df): """Find the Balancing Authority in the U.S. territory that each query bus belongs to based on GIS information. :param (*pandas.DataFrame*) bus_df: data frame contains a list of entries with lat and long of buses. :return: (*tuple*) -- the first entry is the input data frame with two columns, "County" and "BA", added for each bus and the second entry is the list of bus indices that no county matches based on Census API (counties of such buses are assigned based on its nearest neighbour). """ bus_ba_map, bus_no_county_match = map_buses_to_county(bus_df) # hard coded fix for county name mismatch between the returns from census API and # BA_County_map.json bus_ba_map.County.replace( { "Danville City__VA": "Danville__VA", "Harrisonburg City__VA": "Harrisonburg__VA", "Franklin City__VA": "Franklin__VA", }, inplace=True, ) # read json file for BA_County Map filepath = os.path.join( os.path.dirname(inspect.getfile(prereise)), "gather", "data", "BA_County_map.json", ) with open(filepath) as f: ba_county_map = json.load(f) county_ba_map = { value: key for key in ba_county_map for value in ba_county_map[key] } bus_ba_map["BA"] = bus_ba_map["County"].map(county_ba_map) # assign BA to buses without county assigned based on the nearest neighbor neighbor = bus_ba_map.query("~County.isna()") for bus_id in bus_no_county_match: if neighbor.empty: print("No reference bus has BA assigned!") break ind = find_closest_neighbor( (bus_ba_map.loc[bus_id, "lon"], bus_ba_map.loc[bus_id, "lat"]), neighbor[["lon", "lat"]].values.tolist(), ) bus_ba_map.loc[bus_id, "BA"] = neighbor.iloc[ind]["BA"] return bus_ba_map, bus_no_county_match