Source code for powersimdata.design.investment.investment_costs

import warnings

import numpy as np
import pandas as pd

from powersimdata.design.compare.generation import calculate_plant_difference
from powersimdata.design.compare.transmission import (
    calculate_branch_difference,
    calculate_dcline_difference,
)
from powersimdata.design.investment import const
from powersimdata.design.investment.create_mapping_files import (
    bus_to_neem_reg,
    bus_to_reeds_reg,
)
from powersimdata.design.investment.inflation import calculate_inflation
from powersimdata.input.check import _check_grid_models_match
from powersimdata.utility.distance import haversine


[docs]def merge_keep_index(df1, df2, **kwargs): """Merge ``df1`` and ``df2``, preserving the index of the first dataframe. :param pandas.DataFrame df1: first data frame, to call pandas ``merge()`` from. :param pandas.DataFrame df2: second data frame, argument to pandas ``merge()``. :param \\*\\*kwargs: arbitrary keyword arguments passed to pandas ``merge()``. :return: (*pandas.DataFrame*) -- ``df1`` merged with ``df2`` with indices preserved. """ return df1.reset_index().merge(df2, **kwargs).set_index(df1.index.names)
[docs]def append_keep_index_name(df1, df2, *args, **kwargs): """Append ``df2`` to ``df1``, preserving the index name of the first data frame. :param pandas.DataFrame df1: first data frame. :param pandas.DataFrame df2: second data frame. :param \\*args: arbitrary positional arguments passed to pandas ``concat()``. :param \\*\\*kwargs: arbitrary keyword arguments passed to pandas ``concat()``. :return: (*pandas.DataFrame*) -- ``df2`` appended to ``df1`` with index name set to name of ``df1`` index. """ df = pd.concat([df1, df2], *args, **kwargs) df.index.name = df1.index.name return df
[docs]def calculate_ac_inv_costs( scenario, sum_results=True, exclude_branches=None, base_grid=None, ): """Calculate cost of upgrading AC lines and/or transformers in a scenario. NEEM regions are used to find regional multipliers. :param powersimdata.scenario.scenario.Scenario scenario: scenario instance. :param bool sum_results: whether to sum data frame for each branch type. Defaults to True. :param powersimdata.input.grid.Grid base_grid: a Grid to compare against. If None, the grid model and interconnect from the ``scenario`` are used to instantiate a corresponding unmodified Grid. :return: (*dict*) -- keys are {'line_cost', 'transformer_cost'}, values are either float if ``sum_results``, or pandas Series indexed by branch ID. Whether summed or not, values are $USD, inflation-adjusted to today. """ grid_differences = scenario.get_grid() if base_grid is None: base_grid = scenario.get_base_grid() else: _check_grid_models_match(base_grid, grid_differences) # find upgraded AC lines capacity_difference = calculate_branch_difference( base_grid.branch, grid_differences.branch ) grid_differences.branch = grid_differences.branch.assign( rateA=capacity_difference["diff"].to_numpy() ) grid_differences.branch = grid_differences.branch.query("rateA != 0.0") if exclude_branches is not None: present_exclude_branches = set(exclude_branches) & set( grid_differences.branch.index ) grid_differences.branch.drop(index=present_exclude_branches, inplace=True) costs = _calculate_ac_inv_costs(grid_differences, sum_results) return costs
def _calculate_ac_inv_costs(grid_new, sum_results=True): """Calculate cost of upgrading AC lines and/or transformers. NEEM regions are used to find regional multipliers. Note that a transformer winding is considered as a transformer. :param powersimdata.input.grid.Grid grid_new: grid instance. :param bool sum_results: whether to sum data frame for each branch type. Defaults to True. :return: (*dict*) -- keys are {'line_cost', 'transformer_cost'}, values are either float if ``sum_results``, or pandas Series indexed by branch ID. Whether summed or not, values are $USD, inflation-adjusted to today. """ def select_mw(x, cost_df): """Determine the closest kV/MW combination for a single branch and return the corresponding cost (in $/MW-mi). :param pandas.Series x: data for a single branch :param pandas.DataFrame cost_df: data frame with *'kV'*, *'MW'*, *'costMWmi'* as columns :return: (*pandas.Series*) -- series of [*'MW'*, *'costMWmi'*] to be assigned to branch. """ underground_regions = ("NEISO", "NYISO J-K") filtered_cost_df = cost_df.copy() # Unless we are entirely within an underground region, drop this cost class if not (x.from_region == x.to_region and x.from_region in underground_regions): filtered_cost_df = filtered_cost_df.query("kV != 345 or MW != 500") # select corresponding cost table of selected kV filtered_cost_df = filtered_cost_df[filtered_cost_df["kV"] == x.kV] # get rid of NaN values in this kV table filtered_cost_df = filtered_cost_df[~filtered_cost_df["MW"].isna()] # find closest MW & corresponding cost filtered_cost_df = filtered_cost_df.iloc[ np.argmin(np.abs(filtered_cost_df["MW"] - x.rateA)) ] return filtered_cost_df.loc[["MW", "costMWmi"]] def get_branch_mult(x, bus_reg, ac_reg_mult, branch_lookup_alerted=set()): """Determine the regional multiplier based on kV and power (closest). :param pandas.Series x: data for a single branch. :param pandas.DataFrame bus_reg: data frame with bus regions. :param pandas.DataFrame ac_reg_mult: data frame with regional multipliers. :param set branch_lookup_alerted: set of (voltage, region) tuples for which a message has already been printed that this lookup was not found. :return: (*float*) -- regional multiplier. """ # Select the highest voltage for transformers (branch end voltages should match) max_kV = bus.loc[[x.from_bus_id, x.to_bus_id], "baseKV"].max() # noqa: N806 # Average the multipliers for branches (transformer regions should match) regions = (x.from_region, x.to_region) region_mults = ac_reg_mult.loc[ac_reg_mult.name_abbr.isin(regions)] region_mults = ( region_mults.groupby(["kV", "MW"]).mean(numeric_only=True).reset_index() ) mult_lookup_kV = region_mults.loc[ # noqa: N806 (region_mults.kV - max_kV).abs().idxmin() ].kV region_kV_mults = region_mults[region_mults.kV == mult_lookup_kV] # noqa: N806 region_kV_mults = region_kV_mults.loc[ # noqa: N806 ~region_kV_mults.mult.isnull() ] if len(region_kV_mults) == 0: mult = 1 if (mult_lookup_kV, regions) not in branch_lookup_alerted: print(f"No multiplier for voltage {mult_lookup_kV} in {regions}") branch_lookup_alerted.add((mult_lookup_kV, regions)) else: mult_lookup_MW = region_kV_mults.loc[ # noqa: N806 (region_kV_mults.MW - x.rateA).abs().idxmin(), "MW" ] mult = ( region_kV_mults.loc[region_kV_mults.MW == mult_lookup_MW].squeeze().mult ) return mult # import data ac_cost = pd.DataFrame(const.ac_line_cost) ac_reg_mult = pd.read_csv(const.ac_reg_mult_path) ac_reg_mult = ac_reg_mult.melt( id_vars=["kV", "MW"], var_name="name_abbr", value_name="mult" ) try: bus_reg = pd.read_csv(const.bus_neem_regions_path, index_col="bus_id") except FileNotFoundError: bus_reg = bus_to_neem_reg(grid_new.bus) bus_reg.sort_index().to_csv(const.bus_neem_regions_path) xfmr_cost = pd.read_csv(const.transformer_cost_path, index_col=0).fillna(0) xfmr_cost.columns = [int(c) for c in xfmr_cost.columns] # Mirror across diagonal xfmr_cost += xfmr_cost.to_numpy().T - np.diag(np.diag(xfmr_cost.to_numpy())) # check that all buses included in this file and lat/long values match, # otherwise re-run mapping script on mis-matching buses. These buses are missing # in region file bus = grid_new.bus mapped_buses = bus.query("index in @bus_reg.index") missing_bus_indices = set(bus.index) - set(bus_reg.index) mapped_buses = merge_keep_index(mapped_buses, bus_reg, how="left", on="bus_id") # these buses have incorrect lat/lon values in the region mapping file. # re-running the region mapping script on those buses only. misaligned_bus_indices = mapped_buses[ ~np.isclose(mapped_buses.lat_x, mapped_buses.lat_y) | ~np.isclose(mapped_buses.lon_x, mapped_buses.lon_y) ].index all_buses_to_fix = set(missing_bus_indices) | set(misaligned_bus_indices) # fix the identified buses, if necessary if len(all_buses_to_fix) > 0: bus_fix = bus_to_neem_reg(bus.query("index in @all_buses_to_fix")) fix_cols = ["name_abbr", "lat", "lon"] corrected_bus_mappings = bus_fix.loc[misaligned_bus_indices, fix_cols] new_bus_mappings = bus_fix.loc[missing_bus_indices, fix_cols] bus_reg.loc[misaligned_bus_indices, fix_cols] = corrected_bus_mappings bus_reg = append_keep_index_name(bus_reg, new_bus_mappings) bus_reg.drop(["lat", "lon"], axis=1, inplace=True) # Add extra information to branch data frame branch = grid_new.branch branch.loc[:, "kV"] = bus.loc[branch.from_bus_id, "baseKV"].tolist() branch.loc[:, "from_region"] = bus_reg.loc[branch.from_bus_id, "name_abbr"].tolist() branch.loc[:, "to_region"] = bus_reg.loc[branch.to_bus_id, "name_abbr"].tolist() # separate transformers and lines t_mask = branch["branch_device_type"].isin(["Transformer", "TransformerWinding"]) transformers = branch[t_mask].copy() lines = branch[~t_mask].copy() if len(lines) > 0: # Find closest kV rating lines.loc[:, "kV"] = lines.apply( lambda x: ac_cost.loc[(ac_cost["kV"] - x.kV).abs().idxmin(), "kV"], axis=1, ) lines[["MW", "costMWmi"]] = lines.apply(lambda x: select_mw(x, ac_cost), axis=1) lines["mult"] = lines.apply( lambda x: get_branch_mult(x, bus_reg, ac_reg_mult), axis=1 ) # calculate MWmi lines.loc[:, "lengthMi"] = lines.apply( lambda x: haversine((x.from_lat, x.from_lon), (x.to_lat, x.to_lon)), axis=1 ) else: new_columns = ["kV", "MW", "costMWmi", "mult", "lengthMi"] lines = lines.reindex(columns=[*lines.columns.tolist(), *new_columns]) lines.loc[:, "MWmi"] = lines["lengthMi"] * lines["rateA"] # calculate cost of each line lines.loc[:, "cost"] = lines["MWmi"] * lines["costMWmi"] * lines["mult"] # calculate transformer costs if len(transformers) > 0: from_to_kv = [ xfmr_cost.index.get_indexer( bus.loc[transformers[e], "baseKV"], method="nearest" ) for e in ["from_bus_id", "to_bus_id"] ] transformers["per_MW_cost"] = [ xfmr_cost.iloc[f, t] for f, t in zip(from_to_kv[0], from_to_kv[1]) ] transformers["mult"] = transformers.apply( lambda x: get_branch_mult(x, bus_reg, ac_reg_mult), axis=1 ) else: # Properly handle case with no transformers, where apply returns wrong dims transformers["per_MW_cost"] = [] transformers["mult"] = [] transformers["cost"] = ( transformers["rateA"] * transformers["per_MW_cost"] * transformers["mult"] ) lines.cost *= calculate_inflation(2010) transformers.cost *= calculate_inflation(2020) if sum_results: return { "line_cost": lines.cost.sum(), "transformer_cost": transformers.cost.sum(), } else: return {"line_cost": lines.cost, "transformer_cost": transformers.cost}
[docs]def calculate_dc_inv_costs(scenario, sum_results=True, base_grid=None): """Calculate cost of upgrading HVDC lines in a scenario. :param powersimdata.scenario.scenario.Scenario scenario: scenario instance. :param bool sum_results: whether to sum series to return total cost. Defaults to True. :param powersimdata.input.grid.Grid base_grid: a Grid to compare against. If None, the grid model and interconnect from the ``scenario`` are used to instantiate a corresponding unmodified Grid. :return: (*pandas.Series/float*) -- cost of upgrading HVDC lines, in $USD, inflation-adjusted to today. If ``sum_results``, a float is returned, otherwise a Series. """ grid_differences = scenario.get_grid() if base_grid is None: base_grid = scenario.get_base_grid() else: _check_grid_models_match(base_grid, grid_differences) # find upgraded DC lines capacity_difference = calculate_dcline_difference(base_grid, grid_differences) grid_differences.dcline = grid_differences.dcline.assign( Pmax=capacity_difference["diff"].to_numpy() ) grid_differences.dcline = grid_differences.dcline.query("Pmax != 0.0") costs = _calculate_dc_inv_costs(grid_differences, sum_results) return costs
def _calculate_dc_inv_costs(grid_new, sum_results=True): """Calculate cost of upgrading HVDC lines. :param powersimdata.input.grid.Grid grid_new: grid instance. :param bool sum_results: whether to sum series to return total cost. Defaults to True. :return: (*pandas.Series/float*) -- cost of upgrading HVDC lines, in $USD, inflation-adjusted to today. If ``sum_results``, a float is returned, otherwise a Series. """ def _calculate_single_line_cost(line, bus): """Calculate cost of upgrading a single HVDC line. :param pandas.Series line: HVDC line series featuring *'from_bus_id'*', *'to_bus_id'* and *'Pmax'*. :param pandas.Dataframe bus: bus data frame featuring *'lat'*, *'lon'*. :return: (*float*) -- HVDC line upgrade cost in $2015. """ # Calculate distance from_lat = bus.loc[line.from_bus_id, "lat"] from_lon = bus.loc[line.from_bus_id, "lon"] to_lat = bus.loc[line.to_bus_id, "lat"] to_lon = bus.loc[line.to_bus_id, "lon"] miles = haversine((from_lat, from_lon), (to_lat, to_lon)) # Calculate cost total_cost = line.Pmax * ( miles * const.hvdc_line_cost["costMWmi"] * calculate_inflation(2015) + 2 * const.hvdc_terminal_cost_per_MW * calculate_inflation(2020) ) return total_cost bus = grid_new.bus dcline = grid_new.dcline # if any dclines, do calculations, otherwise, return 0 costs. if len(dcline != 0): dcline_costs = dcline.apply(_calculate_single_line_cost, args=(bus,), axis=1) if sum_results: return dcline_costs.sum() else: return dcline_costs else: return 0.0
[docs]def calculate_gen_inv_costs( scenario, year, cost_case, sum_results=True, base_grid=None, ): """Calculate cost of upgrading generators in a scenario. ReEDS regions are used to find regional multipliers. :param powersimdata.scenario.scenario.Scenario scenario: scenario instance. :param int/str year: building year. :param str cost_case: ATB cost case of data. *'Moderate'*: mid cost case, *'Conservative'*: generally higher costs, *'Advanced'*: generally lower costs :param bool sum_results: whether to sum data frame for plant costs. Defaults to True. :param powersimdata.input.grid.Grid base_grid: a Grid to compare against. If None, the grid model and interconnect from the ``scenario`` are used to instantiate a corresponding unmodified Grid. :return: (*pandas.Series*) -- Overnight generation investment cost. If ``sum_results``, indices are technologies and values are total cost. Otherwise, indices are IDs of plants (including storage, which is given pseudo-plant-IDs), and values are individual generator costs. Whether summed or not, values are $USD, inflation-adjusted to today. .. todo:: it currently uses one (arbitrary) sub-technology. The rest of the costs are dropped. Wind and solar will need to be fixed based on the resource supply curves. """ grid_differences = scenario.get_grid() if base_grid is None: base_grid = scenario.get_base_grid() else: _check_grid_models_match(base_grid, grid_differences) # Find change in generation capacity capacity_difference = calculate_plant_difference( base_grid.plant, grid_differences.plant ) grid_differences.plant = grid_differences.plant.assign( Pmax=capacity_difference["diff"].to_numpy() ) grid_differences.plant = grid_differences.plant.query("Pmax >= 0.01") # Find change in storage capacity # Reindex so that we don't get NaN when calculating upgrades for new storage base_grid.storage["gen"] = base_grid.storage["gen"].reindex( grid_differences.storage["gen"].index, fill_value=0 ) grid_differences.storage["gen"].Pmax = ( grid_differences.storage["gen"].Pmax - base_grid.storage["gen"].Pmax ) grid_differences.storage["gen"]["type"] = "storage" costs = _calculate_gen_inv_costs(grid_differences, year, cost_case, sum_results) return costs
def _calculate_gen_inv_costs(grid_new, year, cost_case, sum_results=True): """Calculate cost of upgrading generators. ReEDS regions are used to find regional multipliers. :param powersimdata.input.grid.Grid grid_new: grid instance. :param int/str year: year of builds. :param str cost_case: ATB cost case of data. *'Moderate'*: mid cost case *'Conservative'*: generally higher costs, *'Advanced'*: generally lower costs. :raises ValueError: if year not 2020 - 2050, or cost case not an allowed option. :raises TypeError: if year not int/str or cost_case not str. :param bool sum_results: whether to sum data frame for plant costs. Defaults to True. :return: (*pandas.Series*) -- Overnight generation investment cost. If ``sum_results``, indices are technologies and values are total cost. Otherwise, indices are IDs of plants (including storage, which is given pseudo-plant-IDs), and values are individual generator costs. Whether summed or not, values are $USD, inflation-adjusted to today. .. note:: the function computes the total capital cost as: CAPEX_total = overnight CAPEX ($/MW) * Power capacity (MW) * regional multiplier """ def load_cost(year, cost_case): """Load in base costs from NREL's 2020 ATB for generation technologies (CAPEX). :param int/str year: year of cost projections. :param str cost_case: ATB cost case of data (see :return: (*pandas.DataFrame*) -- cost by technology/subtype in $2018. .. todo:: it can be adapted in the future for FOM, VOM, & CAPEX. This data is pulled from the ATB xlsx file summary pages. Therefore, it currently uses default financials, but will want to create custom financial functions in the future. """ cost = pd.read_csv(const.gen_inv_cost_path) cost = cost.dropna(axis=0, how="all") # drop non-useful columns cols_drop = cost.columns[ ~cost.columns.isin( [str(x) for x in cost.columns[0:6]] + ["Metric", str(year)] ) ] cost.drop(cols_drop, axis=1, inplace=True) # rename year of interest column cost.rename(columns={str(year): "value"}, inplace=True) # get rid of #refs cost.drop(cost[cost["value"] == "#REF!"].index, inplace=True) # get rid of $s, commas cost["value"] = cost["value"].str.replace("$", "", regex=True) cost["value"] = cost["value"].str.replace(",", "", regex=True).astype("float64") # scale from $/kW to $/MW cost["value"] *= 1000 cost.rename(columns={"value": "CAPEX"}, inplace=True) # select scenario of interest if cost_case != "Moderate": # The 2020 ATB only has "Moderate" for nuclear, so we need to make due. warnings.warn( f"No cost data available for Nuclear for {cost_case} cost case, " "using Moderate cost case data instead" ) new_nuclear = cost.query( "Technology == 'Nuclear' and CostCase == 'Moderate'" ).copy() new_nuclear.CostCase = cost_case cost = pd.concat([cost, new_nuclear], ignore_index=True) cost = cost[cost["CostCase"] == cost_case] cost.drop(["CostCase"], axis=1, inplace=True) return cost if isinstance(year, (int, str)): year = int(year) if year not in range(2020, 2051): raise ValueError("year not in range.") else: raise TypeError("year must be int or str.") if isinstance(cost_case, str): if cost_case not in ["Moderate", "Conservative", "Advanced"]: raise ValueError("cost_case not Moderate, Conservative, or Advanced") else: raise TypeError("cost_case must be str.") storage_plants = grid_new.storage["gen"].set_index( grid_new.storage["StorageData"].UnitIdx.astype(int) ) plants = append_keep_index_name(grid_new.plant, storage_plants) plants = plants[ ~plants.type.isin(["dfo", "other"]) ] # drop these technologies, no cost data # BASE TECHNOLOGY COST # load in investment costs $/MW gen_costs = load_cost(year, cost_case) # keep only certain (arbitrary) subclasses for now gen_costs = gen_costs[ gen_costs["TechDetail"].isin(const.gen_inv_cost_techdetails_to_keep) ] # rename techs to match grid object gen_costs.replace(const.gen_inv_cost_translation, inplace=True) gen_costs.drop(["Key", "FinancialCase", "CRPYears"], axis=1, inplace=True) # ATB technology costs merge plants = merge_keep_index( plants, gen_costs, right_on="Technology", left_on="type", how="left" ) # REGIONAL COST MULTIPLIER # Find ReEDS regions of plants (for regional cost multipliers) plant_buses = plants.bus_id.unique() try: bus_reg = pd.read_csv(const.bus_reeds_regions_path, index_col="bus_id") if not set(plant_buses) <= set(bus_reg.index): missing_buses = set(plant_buses) - set(bus_reg.index) bus_reg = bus_reg.append(bus_to_reeds_reg(grid_new.bus.loc[missing_buses])) bus_reg.sort_index().to_csv(const.bus_reeds_regions_path) except FileNotFoundError: bus_reg = bus_to_reeds_reg(grid_new.bus.loc[plant_buses]) bus_reg.sort_index().to_csv(const.bus_reeds_regions_path) plants = merge_keep_index( plants, bus_reg, left_on="bus_id", right_index=True, how="left" ) # Determine one region 'r' for each plant, based on one of two mappings plants.loc[:, "r"] = "" # Some types get regional multipliers via 'wind regions' ('rs') wind_region_mask = plants["type"].isin(const.regional_multiplier_wind_region_types) plants.loc[wind_region_mask, "r"] = plants.loc[wind_region_mask, "rs"] # Other types get regional multipliers via 'BA regions' ('rb') ba_region_mask = plants["type"].isin(const.regional_multiplier_ba_region_types) plants.loc[ba_region_mask, "r"] = plants.loc[ba_region_mask, "rb"] plants.drop(["rs", "rb"], axis=1, inplace=True) # merge regional multipliers with plants region_multiplier = pd.read_csv(const.regional_multiplier_path) region_multiplier.replace(const.regional_multiplier_gen_translation, inplace=True) plants = merge_keep_index( plants, region_multiplier, left_on=["r", "Technology"], right_on=["r", "i"], how="left", ) # multiply all together to get summed CAPEX ($) plants.loc[:, "cost"] = ( plants["CAPEX"] * plants["Pmax"] * plants["reg_cap_cost_mult"] ) # sum cost by technology plants.loc[:, "cost"] *= calculate_inflation(2018) if sum_results: return plants.groupby(["Technology"])["cost"].sum() else: return plants["cost"]