Source code for powersimdata.design.transmission.upgrade

import pandas as pd

from powersimdata.design.investment.investment_costs import _calculate_ac_inv_costs
from powersimdata.input.grid import Grid
from powersimdata.utility.distance import haversine


def _find_branches_connected_to_bus(branch, bus_id):
    """Find all branches connected to a given bus.

    :param pandas.DataFrame branch: branch DataFrame from Grid object.
    :param int bus_id: index of bus to find neighbors of.
    :return: (*set*) -- set of bus indexes (integers).
    """
    branches_from = branch.index[branch["from_bus_id"] == bus_id].tolist()
    branches_to = branch.index[branch["to_bus_id"] == bus_id].tolist()

    branches_connected = set(branches_from) | set(branches_to)

    return branches_connected


def _find_first_degree_branches(branch, branch_id):
    """Find all branches connected to a given branch.

    :param pandas.DataFrame branch: branch data frame from Grid object.
    :param int branch_id: index of branches to find neighbors of.
    :return: (*set*) -- set of branch indexes (integers).
    """
    from_bus = branch.loc[branch_id, "from_bus_id"]
    to_bus = branch.loc[branch_id, "to_bus_id"]
    endpoints = (from_bus, to_bus)
    to_endpoints = branch.index[branch["to_bus_id"].isin(endpoints)]
    from_endpoints = branch.index[branch["from_bus_id"].isin(endpoints)]
    first_degree_branch_idxs = set(to_endpoints) | set(from_endpoints)

    return first_degree_branch_idxs


def _find_stub_degree(branch, bus_id):
    """Find degree of stubbiness, and stub branches.

    :param pandas.DataFrame branch: branch data frame from Grid object.
    :param int bus_id: index of bus to find subbiness of.
    :return: (*tuple*) -- tuple containing:
        stub_degree (*int*) -- How stubby (non-negative integer).
        connected_branches (*set*) -- set of branch indexes (integers).
    """
    connected_branch_idxs = _find_branches_connected_to_bus(branch, bus_id)
    if len(connected_branch_idxs) == 1:
        second_degree_branch_idxs = _find_first_degree_branches(
            branch, tuple(connected_branch_idxs)[0]
        )
        if len(second_degree_branch_idxs) == 2:
            # We could keep going recursively, but this is the max in Western.
            return 2, second_degree_branch_idxs
        else:
            return 1, connected_branch_idxs
    else:
        return 0, set()


def _find_capacity_at_bus(plant, bus_id, gentypes):
    """Find total capacity of plants with the given type(s) at the given bus.

    :param pandas.DataFrame plant: plant data frame from Grid object.
    :param int bus_id: index of bus to find generators at.
    :param list/tuple/set/str gentypes: type of generators to sum capacity for.
    :return: (*float*) -- total capacity at bus.
    """

    if isinstance(gentypes, str):
        gentypes = (gentypes,)
    gentype_plants = plant[plant["type"].isin(gentypes)]
    at_bus = gentype_plants["bus_id"] == bus_id
    gentype_plants_at_bus = gentype_plants[at_bus]
    gentype_capacity = gentype_plants_at_bus["Pmax"].sum()

    return gentype_capacity


[docs]def scale_renewable_stubs(change_table, fuzz=1, inplace=True, verbose=False): """Identify renewable gens behind 'stub' branches, scale up branch capacity (via change_table entries) to match generator capacity. :param powersimdata.input.change_table.ChangeTable change_table: ChangeTable object. :param float/int fuzz: adds just a little extra capacity to avoid binding. :param bool inplace: if True, modify ct inplace and return None. If False, copy ct and return modified copy. :param bool verbose: if True, print info for each unscaled plant. :return: (*None*/*dict*) -- if inplace is True, return modified ct dict. """ if inplace: ct = change_table.ct else: ct = change_table.ct.copy() ref_plant = change_table.grid.plant ref_branch = change_table.grid.branch ref_bus = change_table.grid.bus if "branch" not in ct: ct["branch"] = {} if "branch_id" not in ct["branch"]: ct["branch"]["branch_id"] = {} branch_id_ct = ct["branch"]["branch_id"] for r in change_table.grid.model_immutables.plants["profile_resources"]: ren_plants = ref_plant[ref_plant["type"] == r] for p in ren_plants.index: bus_id = ref_plant.loc[p, "bus_id"] stub_degree, stub_branches = _find_stub_degree(ref_branch, bus_id) if stub_degree > 0: ren_capacity = _find_capacity_at_bus(ref_plant, bus_id, r) if ren_capacity <= 0: print("%s plant %s at bus %s has 0 Pmax!" % (r, p, bus_id)) continue # Calculate total scaling factor (zone * plant) gen_scale_factor = 1 # First scale by zone_id zone_id = ref_bus.loc[bus_id, "zone_id"] try: gen_scale_factor *= ct[r]["zone_id"][zone_id] except KeyError: pass # Then scale by plant_id try: gen_scale_factor *= ct[r]["plant_id"][p] except KeyError: pass if verbose and gen_scale_factor == 1: print(f"no scaling factor for {r}, {zone_id}, plant {p}") for b in stub_branches: if ref_branch.loc[b, "rateA"] == 0: continue old_branch_cap = ref_branch.loc[b, "rateA"] if old_branch_cap < ren_capacity * gen_scale_factor: new_branch_cap = ren_capacity * gen_scale_factor + fuzz branch_id_ct[b] = new_branch_cap / old_branch_cap if not inplace: return ct
[docs]def get_branches_by_area(grid, area_names, method="either"): """Given a set of area names, select branches which are in one or more of these areas. :param powersimdata.input.grid.Grid grid: Grid to query for topology. :param list/set/tuple area_names: an iterable of area names, used to look up zone names. :param str method: whether to include branches which span zones. Options: - 'internal': only select branches which are to/from the same area. - 'bridging': only select branches which connect area to another. - 'either': select branches if either end is in area. Equivalent to 'internal' + 'bridging'. :return: (*set*) -- a set of branch IDs. :raise TypeError: if area_names not a list/set/tuple, or method not a str. :raise ValueError: if not all elements of area_names are strings, if method is not one of the recognized methods. """ allowed_methods = {"internal", "bridging", "either"} if not isinstance(grid, Grid): raise TypeError("grid must be a Grid object") if not isinstance(area_names, (list, set, tuple)): raise TypeError("area_names must be list, set, or tuple") if not all([isinstance(a, str) for a in area_names]): raise ValueError("each value in area_names must be a str") if not isinstance(method, str): raise TypeError("method must be a str") if method not in allowed_methods: raise ValueError("valid methods are: " + " | ".join(allowed_methods)) branch = grid.branch selected_branches = set() for a in area_names: load_zone_names = grid.model_immutables.area_to_loadzone(a) to_bus_in_area = branch.to_zone_name.isin(load_zone_names) from_bus_in_area = branch.from_zone_name.isin(load_zone_names) if method in ("internal", "either"): internal_branches = branch[to_bus_in_area & from_bus_in_area].index selected_branches |= set(internal_branches) if method in ("bridging", "either"): bridging_branches = branch[to_bus_in_area ^ from_bus_in_area].index selected_branches |= set(bridging_branches) return selected_branches
[docs]def scale_congested_mesh_branches( change_table, ref_scenario, upgrade_n=100, allow_list=None, deny_list=None, congestion_metric="quantile", cost_metric="branches", quantile=None, increment=1, ): """Use a reference scenario to detect congested mesh branches (based on values of the shadow price of congestion), and increase the capacity of a subset of them. Branches are ranked by a ratio of (congestion metric) / (cost metric), and the top N branches are selected for upgrading, where N is specified by ``upgraade_n``, with each upgraded by their base capacity multiplied by ``increment``. :param powersimdata.input.change_table.ChangeTable change_table: ChangeTable object. :param powersimdata.scenario.scenario.Scenario ref_scenario: the reference scenario to be used in bootstrapping the branch scaling factors. :param int upgrade_n: the number of branches to upgrade. :param list/set/tuple/None allow_list: only select from these branch IDs. :param list/set/tuple/None deny_list: never select any of these branch IDs. :param str congestion_metric: numerator method: 'quantile' or 'mean'. :param str cost_metric: denominator method: 'branches', 'cost', 'MW', or 'MWmiles'. :param float quantile: if ``congestion_metric`` == 'quantile', this is the quantile to use to judge branch congestion (otherwise it is unused). If None, a default value of 0.95 is used, i.e. we evaluate the shadow price for the worst 5% of hours. :param float/int increment: branch increment, relative to original capacity. :return: (*None*) -- the change_table is modified in-place. :raises ValueError: if ``ref_scenario`` is not in analyze state. """ if ref_scenario.state.name != "analyze": raise ValueError("ref_scenario must be in Analyze state") branches_to_upgrade = _identify_mesh_branch_upgrades( ref_scenario, upgrade_n=upgrade_n, congestion_metric=congestion_metric, cost_metric=cost_metric, allow_list=allow_list, deny_list=deny_list, quantile=quantile, ) _increment_branch_scaling( change_table, branches_to_upgrade, ref_scenario, value=increment )
def _identify_mesh_branch_upgrades( ref_scenario, upgrade_n=100, allow_list=None, deny_list=None, congestion_metric="quantile", cost_metric="branches", quantile=None, ): """Identify the N most congested branches in a previous scenario, based on the quantile value of congestion duals, where N is specified by ``upgrade_n``. A quantile value of 0.95 obtains the branches with highest dual in top 5% of hours. :param powersimdata.scenario.scenario.Scenario ref_scenario: the reference scenario to be used to determine the most congested branches. :param int upgrade_n: the number of branches to upgrade. :param list/set/tuple/None allow_list: only select from these branch IDs. :param list/set/tuple/None deny_list: never select any of these branch IDs. :param str congestion_metric: numerator method: 'quantile' or 'mean'. :param str cost_metric: denominator method: 'branches', 'cost', 'MW', or 'MWmiles'. :param float quantile: if ``congestion_metric`` == 'quantile', this is the quantile to use to judge branch congestion (otherwise it is unused). If None, a default value of 0.95 is used, i.e. we evaluate the shadow price for the worst 5% of hours. :raises ValueError: if ``congestion_metric`` or ``cost_metric`` is not recognized, ``congestion_metric`` == 'mean' but a ``quantile`` is specified, or ``congestion_metric`` == 'quantile' but there are not enough branches which are congested at the desired frequency based on the ``quantile`` specified. :return: (*set*) -- A set of ints representing branch indices. """ # How big does a dual value have to be to be 'real' and not barrier cruft? cong_significance_cutoff = 1e-6 # $/MWh # If we rank by MW-miles, what 'length' do we give to zero-length branches? zero_length_value = 1 # miles # If the quantile is not provided, what should we default to? default_quantile = 0.95 # Validate congestion_metric input allowed_congestion_metrics = ("mean", "quantile") if congestion_metric not in allowed_congestion_metrics: allowed_list = ", ".join(allowed_congestion_metrics) raise ValueError(f"congestion_metric must be one of: {allowed_list}") if congestion_metric == "mean" and quantile is not None: raise ValueError("quantile cannot be specified if congestion_metric is 'mean'") if congestion_metric == "quantile" and quantile is None: quantile = default_quantile # Validate cost_metric input allowed_cost_metrics = ("branches", "MW", "MWmiles", "cost") if cost_metric not in allowed_cost_metrics: allowed_list = ", ".join(allowed_cost_metrics) raise ValueError(f"cost_metric must be one of: {allowed_list}") # Get raw congestion dual values, add them ref_cong_abs = ref_scenario.state.get_congu() + ref_scenario.state.get_congl() all_branches = set(ref_cong_abs.columns.tolist()) # Create validated composite allow list, and filter shadow price data frame composite_allow_list = _construct_composite_allow_list( all_branches, allow_list, deny_list ) ref_cong_abs = ref_cong_abs.filter(items=composite_allow_list) if congestion_metric == "mean": congestion_metric_values = ref_cong_abs.mean() if congestion_metric == "quantile": congestion_metric_values = ref_cong_abs.quantile(quantile) # Filter out 'insignificant' values congestion_metric_values = congestion_metric_values.where( congestion_metric_values > cong_significance_cutoff ).dropna() # Filter based on composite allow list congested_indices = list(congestion_metric_values.index) # Ensure that we have enough congested branches to upgrade num_congested = len(congested_indices) if num_congested < upgrade_n: err_msg = "not enough congested branches: " err_msg += f"{upgrade_n} desired, but only {num_congested} congested." if congestion_metric == "quantile": err_msg += ( f" The quantile used is {quantile}; increasing this value will increase" " the number of branches which qualify as having 'frequent-enough'" " congestion and can be selected for upgrades." ) raise ValueError(err_msg) # Calculate selected cost metric for congested branches if cost_metric == "cost": # Calculate costs for an upgrade dataframe containing only composite_allow_list base_grid = ref_scenario.get_base_grid() base_grid.branch = base_grid.branch.filter(items=congested_indices, axis=0) upgrade_costs = _calculate_ac_inv_costs(base_grid, sum_results=False) # Merge the individual line/transformer data into a single Series merged_upgrade_costs = pd.concat([v for v in upgrade_costs.values()]) if cost_metric in ("MW", "MWmiles"): ref_grid = ref_scenario.state.get_grid() branch_ratings = ref_grid.branch.loc[congested_indices, "rateA"] # Calculate 'original' branch capacities, since that's our increment ref_ct = ref_scenario.state.get_ct() try: branch_ct = ref_ct["branch"]["branch_id"] except KeyError: branch_ct = {} branch_prev_scaling = pd.Series( {i: (branch_ct[i] if i in branch_ct else 1) for i in congested_indices} ) branch_ratings = branch_ratings / branch_prev_scaling # Then, apply this metric if cost_metric == "MW": branch_metric = congestion_metric_values / branch_ratings elif cost_metric == "MWmiles": branch_lengths = ref_grid.branch.loc[congested_indices].apply( lambda x: haversine((x.from_lat, x.from_lon), (x.to_lat, x.to_lon)), axis=1 ) # Replace zero-length branches by designated default, don't divide by 0 branch_lengths = branch_lengths.replace(0, value=zero_length_value) branch_metric = congestion_metric_values / (branch_ratings * branch_lengths) elif cost_metric == "cost": branch_metric = congestion_metric_values / merged_upgrade_costs else: # By process of elimination, all that's left is method 'branches' branch_metric = congestion_metric_values # Sort by our metric, grab indexes for N largest values (tail), return ranked_branches = set(branch_metric.sort_values().tail(upgrade_n).index) return ranked_branches def _construct_composite_allow_list(valid_branches, allow_list, deny_list): """Create a set of allowed branches by selecting from a set of all branches either only from the allow list, or everything but the deny list. :param list/set/tuple valid_branches: List of valid branches to select. :param list/set/tuple/None allow_list: only select from these branch IDs. :param list/set/tuple/None deny_list: never select any of these branch IDs. :return (*set*) -- set of allowed branch IDs. :raises ValueError: if both allow_list and deny_list are specified, or if allow_list or deny_list contain IDs not in valid_branches. :raises TypeError: if valid_branches, allow_list, or deny_list are bad type. """ # Validate valid_branches/allow_list/deny_list Type and combination iterable_types = (list, set, tuple) if not isinstance(valid_branches, iterable_types): raise TypeError("allow_list must be a list, tuple, set, or None") if not ((allow_list is None) or isinstance(allow_list, iterable_types)): raise TypeError("allow_list must be a list, tuple, set, or None") if not ((deny_list is None) or isinstance(deny_list, iterable_types)): raise TypeError("deny_list must be a list, tuple, set, or None") if (allow_list is not None) and (deny_list is not None): raise ValueError("Cannot specify both allow_list and deny_list") # Validate allow_list (if not None), create set of allowed branch IDs if allow_list is None: composite_allow_list = set(valid_branches) else: if set(allow_list) <= set(valid_branches): composite_allow_list = set(allow_list) else: raise ValueError("allow_list contains branch IDs not in results") # Validate deny_list (if not None), subtract from set of allowed branch IDs if deny_list is not None: if set(deny_list) <= set(valid_branches): composite_allow_list -= set(deny_list) else: raise ValueError("deny_list contains branch IDs not in results") return composite_allow_list def _increment_branch_scaling(change_table, branch_ids, ref_scenario, value=1): """Modify the ct dict of a ChangeTable object based on branch scaling from both a reference scenario and a set of branches to increment by a value. :param powersimdata.input.change_table.ChangeTable change_table: the change table instance we are operating on. :param list/set/tuple branch_ids: an iterable of branch indices. :param powersimdata.scenario.scenario.Scenario ref_scenario: the reference scenario to copy branch scaling from. :param int/float value: branch increment, relative to original capacity. :return: (*None*) -- the change_table is modified inplace. """ # Ensure that ct has proper keys ct = change_table.ct if "branch" not in ct: ct["branch"] = {} if "branch_id" not in ct["branch"]: ct["branch"]["branch_id"] = {} branch_scaling = ct["branch"]["branch_id"] # Get previous scenario's branch scaling ref_ct = ref_scenario.state.get_ct() try: ref_branch_scaling = ref_ct["branch"]["branch_id"].copy() except KeyError: ref_branch_scaling = {} # Determine the final ref branch scaling after incrementing for branch in branch_ids: if branch in ref_branch_scaling: ref_branch_scaling[branch] += value else: ref_branch_scaling[branch] = 1 + value # Then merge the ref branch scaling in, unless original scaling is greater for branch in ref_branch_scaling: if branch in branch_scaling: new_scale = max(branch_scaling[branch], ref_branch_scaling[branch]) branch_scaling[branch] = new_scale else: branch_scaling[branch] = ref_branch_scaling[branch]