Source code for powersimdata.design.generation.clean_capacity_scaling

import numpy as np
import pandas as pd

from powersimdata.design.mimic_grid import mimic_generation_capacity
from powersimdata.scenario.scenario import Scenario


def _check_solar_fraction(solar_fraction):
    """Checks that the solar_fraction is between 0 and 1, or is None.

    :param float scale_fraction: desired solar fraction for new capacity.
    :raises TypeError: if type is not int, float, or None.
    :raises ValueError: if value is not between 0 and 1.
    """
    if solar_fraction is None:
        pass
    elif isinstance(solar_fraction, (int, float)):
        if not (0 <= solar_fraction <= 1):
            raise ValueError("solar_fraction must be between 0 and 1")
    else:
        raise TypeError("solar_fraction must be int/float or None")


def _apply_zone_scale_factor_to_ct(ct, fuel, zone_id, scale_factor):
    """Applies a zone scaling factor to a change table, creating internal
    change table structure as necessary. New keys are added, existing keys are
    multiplied.

    :param dict ct: a dictionary of scale factors, with structure matching
        ct from powersimdata.input.change_table.ChangeTable.
    :param str fuel: the fuel to be scaled.
    :param int zone_id: the zone_id to be scaled.
    :param int/float scale_factor: how much the zone should be scaled up by.
    """
    if fuel not in ct:
        ct[fuel] = {}
    if "zone_id" not in ct[fuel]:
        ct[fuel]["zone_id"] = {}
    if zone_id not in ct[fuel]["zone_id"]:
        ct[fuel]["zone_id"][zone_id] = scale_factor
    else:
        ct[fuel]["zone_id"][zone_id] *= scale_factor


[docs]def load_targets_from_csv(filename, drop_ignored=True): """Interprets a CSV file as a set of targets, ensuring that required columns are present, and filling in default values for optional columns. :param str filename: filepath to targets csv. :param bool drop_ignored: if True, drop all ignored columns from output. :return: (*pandas.DataFrame*) -- DataFrame of targets from csv file :raises TypeError: if filename is not a string :raises ValueError: if one or more required columns is missing. """ # Constants mandatory_columns = { "region_name", "ce_target_fraction", } optional_column_defaults = { "allowed_resources": "solar, wind", "external_ce_addl_historical_amount": 0, "solar_percentage": np.nan, "area_type": np.nan, } # Validate input if not isinstance(filename, str): raise TypeError("filename must be a str") # Interpret as object so that we can fillna() with a mixed-type dict raw_targets = pd.read_csv(filename).astype(object) raw_columns = set(raw_targets.columns) if not mandatory_columns <= raw_columns: missing_columns = mandatory_columns - raw_columns raise ValueError(f'Missing columns: {", ".join(missing_columns)}') raw_targets.set_index("region_name", inplace=True) # Report which columns are used vs. unused ignored_columns = raw_columns - mandatory_columns - optional_column_defaults.keys() print(f"ignoring: {ignored_columns}") if drop_ignored: raw_targets.drop(ignored_columns, axis=1, inplace=True) for column in optional_column_defaults.keys(): # Fill optional columns that are missing entirely if column not in raw_columns: raw_targets[column] = np.nan # Fill any empty cells within optional columns raw_targets.fillna(value=optional_column_defaults, inplace=True) return raw_targets
def _make_zonename2target(grid, targets): """Creates a dictionary of {zone_name: target_name} pairs. :param powersimdata.input.grid.Grid grid: Grid instance defining the set of zones. :param pandas.DataFrame targets: a dataframe used to look up constituent zones. :return: (*dict*) -- a dictionary of {zone_name: target_name} pairs. :raises ValueError: if a zone is not present in any target areas, or if a zone is present in more than one target area. """ target_zones = { target_name: grid.model_immutables.area_to_loadzone(target_name) if pd.isnull(targets.loc[target_name, "area_type"]) else grid.model_immutables.area_to_loadzone( target_name, area_type=targets.loc[target_name, "area_type"] ) for target_name in targets.index.tolist() } # Check for any collisions zone_sets = target_zones.values() if len(set.union(*zone_sets)) != sum([len(t) for t in zone_sets]): zone_sets_list = [zone for _set in zone_sets for zone in _set] duplicates = {zone for zone in zone_sets_list if zone_sets_list.count(zone) > 1} error_areas = { zone: {area for area, zone_set in target_zones.items() if zone in zone_set} for zone in duplicates } error_msgs = [f"{k} within: {', '.join(v)}" for k, v in error_areas.items()] raise ValueError(f"Zone(s) within multiple area! {'; '.join(error_msgs)}") zonename2target = {} for target_name, zone_set in target_zones.items(): # Filter out parts of states not in the interconnect(s) in this Grid filtered_zone_set = zone_set & set(grid.zone2id.keys()) zonename2target.update({zone: target_name for zone in filtered_zone_set}) untargetted_zones = set(grid.zone2id.keys()) - set(zonename2target.keys()) if len(untargetted_zones) > 0: err_msg = f"Targets do not cover all load zones. Missing: {untargetted_zones}" raise ValueError(err_msg) return zonename2target def _get_scenario_length(scenario): """Get the number of hours in a scenario. :param powersimdata.scenario.scenario.Scenario scenario: A Scenario instance. :return: (*int*) -- the number of hours in the scenario. """ if not isinstance(scenario, Scenario): raise TypeError("next_scenario must be a Scenario object") if scenario.state.name == "create": start_ts = pd.Timestamp(scenario.state.builder.start_date) end_ts = pd.Timestamp(scenario.state.builder.end_date) else: start_ts = pd.Timestamp(scenario.info["start_date"]) end_ts = pd.Timestamp(scenario.info["end_date"]) num_hours = (end_ts - start_ts) / pd.Timedelta(hours=1) + 1 return num_hours
[docs]def add_resource_data_to_targets(input_targets, scenario, calculate_curtailment=False): """Add resource data to targets. This data includes: previous capacity, previous generation, previous capacity factor (with and without curtailment), and previous curtailment. :param pandas.DataFrame input_targets: table includeing target names, used to summarize resource data. :param powersimdata.scenario.scenario.Scenario scenario: A Scenario instance. :return: (*pandas.DataFrame*) -- data frame of targets including resource data. """ targets = input_targets.copy() grid = scenario.get_grid() plant = grid.plant curtailment_types = ["hydro", "solar", "wind"] scenario_length = _get_scenario_length(scenario) # Map each zone in the grid to a target zonename2target = _make_zonename2target(grid, targets) plant["target_area"] = [zonename2target[z] for z in plant["zone_name"]] # Summarize important values by target area & type groupby_cols = [plant.target_area, plant.type] # Capacity capacity_groupby = plant.Pmax.groupby(groupby_cols) capacity_by_target_type = capacity_groupby.sum().unstack(fill_value=0) # Generated energy pg_groupby = scenario.get_pg().sum().groupby(groupby_cols) summed_generation = pg_groupby.sum().unstack(fill_value=0) # Calculate capacity factors possible_energy = scenario_length * capacity_by_target_type[curtailment_types] capacity_factor = summed_generation[curtailment_types] / possible_energy if calculate_curtailment: # Calculate: curtailment, no_curtailment_cap_factor # Hydro and solar are straightforward hydro_plant_sum = scenario.get_profile("hydro").sum() hydro_plant_targets = plant[ plant["type"].isin( grid.model_immutables.plants["group_profile_resources"]["hydro"] ) ]["target_area"] hydro_potential_by_target = hydro_plant_sum.groupby(hydro_plant_targets).sum() solar_plant_sum = scenario.get_profile("solar").sum() solar_plant_targets = plant[ plant["type"].isin( grid.model_immutables.plants["group_profile_resources"]["solar"] ) ]["target_area"] solar_potential_by_target = solar_plant_sum.groupby(solar_plant_targets).sum() onshore_wind_type = grid.model_immutables.plants["label2type"]["Onshore Wind"] onshore_wind_plants = plant[plant["type"] == onshore_wind_type].index onshore_wind_plant_sum = scenario.get_profile("wind").sum()[onshore_wind_plants] wind_plant_targets = plant[plant["type"] == onshore_wind_type].target_area wind_potential_by_target = onshore_wind_plant_sum.groupby( wind_plant_targets ).sum() potentials_series = [ hydro_potential_by_target, solar_potential_by_target, wind_potential_by_target, ] potential = pd.concat(potentials_series, axis=1) curtailment = ( potential - summed_generation[curtailment_types] ) / possible_energy no_curtailment_cap_factor = potential / possible_energy # Now add these calculations to the DataFrame total_capacity = capacity_by_target_type.sum() nonzero_capacity_resources = total_capacity[total_capacity > 0].index.tolist() for r in nonzero_capacity_resources: targets[f"{r}.prev_capacity"] = capacity_by_target_type[r] targets[f"{r}.prev_generation"] = summed_generation[r] if r in curtailment_types: targets[f"{r}.prev_cap_factor"] = capacity_factor[r] targets[f"{r}.addl_curtailment"] = 0 if calculate_curtailment: targets[f"{r}.no_curtailment_cap_factor"] = no_curtailment_cap_factor[r] targets[f"{r}.curtailment"] = curtailment[r] return targets
[docs]def add_demand_to_targets(input_targets, scenario): """Add demand data to targets. :param pandas.DataFrame input_targets: table including target names, used to summarize demand. :param powersimdata.scenario.scenario.Scenario scenario: A Scenario instance. :return: (*pandas.DataFrame*) -- DataFrame of targets including demand data. """ grid = scenario.state.get_grid() targets = input_targets.copy() zonename2target = _make_zonename2target(grid, targets) zoneid2target = {grid.zone2id[z]: target for z, target in zonename2target.items()} summed_demand = scenario.get_demand().sum().to_frame() summed_demand["target"] = [zoneid2target[id] for id in summed_demand.index] targets["demand"] = summed_demand.groupby("target").sum() return targets
[docs]def add_shortfall_to_targets(input_targets): """Add shortfall data to targets. :param pandas.DataFrame input_targets: table with demand, prev_generation, and ce_target_fraction. :return: (*pandas.DataFrame*) -- DataFrame of targets including shortfall data. """ targets = input_targets.copy() allowed_resources_dict = targets.allowed_resources.to_dict() allowed_sets = { target: {resource.strip() for resource in allowed.split(",")} for target, allowed in allowed_resources_dict.items() } # Detect if there are allowed resources that aren't in the grid, and add them all_allowed = set().union(*allowed_sets.values()) for resource in all_allowed: if f"{resource}.prev_generation" not in targets.columns: targets[f"{resource}.prev_generation"] = 0 targets["prev_ce_generation"] = targets.apply( lambda x: sum([x[f"{r}.prev_generation"] for r in allowed_sets[x.name]]), axis=1 ) targets["ce_target"] = targets.demand * targets.ce_target_fraction total_ce_generation = ( targets.prev_ce_generation + targets.external_ce_addl_historical_amount ) raw_shortfall = targets.ce_target - total_ce_generation targets["ce_shortfall"] = raw_shortfall.clip(lower=0) targets["ce_overgeneration"] = (-1 * raw_shortfall).clip(lower=0) return targets
[docs]def calculate_overall_shortfall(targets, method, normalized=False): """Calculates overall shortfall. :param pandas.DataFrame targets: table of targets. :param str method: shortfall calculation method ("independent" or "collaborative"). :param bool normalized: whether to normalize by total demand. :return: (*float*) -- overall shortfall, either in MWh or normalized by total demand. """ if not isinstance(targets, pd.DataFrame): raise TypeError("targets must be a pandas DataFrame") if "ce_shortfall" not in targets.columns: raise ValueError("targets missing shortfall, see add_shortfall_to_targets()") if not isinstance(normalized, bool): raise TypeError("normalized must be bool") allowed_methods = {"independent", "collaborative"} if method == "collaborative": participating_targets = targets[targets.ce_target > 0] summed_shortfall = participating_targets.ce_shortfall.sum() summed_overgeneration = participating_targets.ce_overgeneration.sum() overall_shortfall = summed_shortfall - summed_overgeneration elif method == "independent": overall_shortfall = targets.ce_shortfall.sum() else: raise ValueError(f"method must be one of: {allowed_methods}") if normalized: return overall_shortfall / targets.demand.sum() else: return overall_shortfall
[docs]def add_new_capacities_independent( input_targets, scenario_length, addl_curtailment=None ): """Calculates new capacities based on an Independent strategy. :param pandas.DataFrame input_targets: table of targets. :param int scenario_length: number of hours in new scenario. :param pandas.DataFrame/None addl_curtailment: additional expected curtailment by target/resource. If None, assumed zero for all targets/resources. :return: (*pandas.DataFrame*) -- targets dataframe with next capacities added. """ def calculate_added_capacity(target): if pd.isnull(target["solar_percentage"]): new_solar_percentage = target["solar.prev_capacity"] / ( target["solar.prev_capacity"] + target["wind.prev_capacity"] ) else: new_solar_percentage = target["solar_percentage"] new_wind_percentage = 1 - new_solar_percentage solar_expected_cf = target["solar.prev_cap_factor"] * ( 1 - target["solar.addl_curtailment"] ) wind_expected_cf = target["wind.prev_cap_factor"] * ( 1 - target["wind.addl_curtailment"] ) if np.isnan(solar_expected_cf): avg_new_cf = wind_expected_cf * new_wind_percentage elif np.isnan(wind_expected_cf): avg_new_cf = solar_expected_cf * new_solar_percentage else: avg_new_cf = ( solar_expected_cf * new_solar_percentage + wind_expected_cf * new_wind_percentage ) total_new_capacity = target["ce_shortfall"] / (avg_new_cf * scenario_length) new_solar = total_new_capacity * new_solar_percentage new_wind = total_new_capacity * (1 - new_solar_percentage) return new_solar, new_wind # Parse inputs resources = ["solar", "wind"] targets = input_targets.copy() if addl_curtailment is None: addl_curtailment = pd.DataFrame(0, index=targets.index, columns=resources) else: addl_curtailment.columns = [ f"{r}.addl_curtailment" for r in addl_curtailment.columns ] targets = targets.join(addl_curtailment) # Calculate new capacity new_solar_wind = targets.apply( calculate_added_capacity, result_type="expand", axis=1 ) # Add new capacity to targets dataframe new_solar_wind.columns = ["solar.added_capacity", "wind.added_capacity"] targets = pd.concat([targets, new_solar_wind], axis=1) for r in resources: targets[f"{r}.next_capacity"] = ( targets[f"{r}.prev_capacity"] + targets[f"{r}.added_capacity"] ) return targets
[docs]def add_new_capacities_collaborative( input_targets, scenario_length, solar_fraction=None, addl_curtailment=None ): """Calculates new capacities based on a Collaborative strategy. :param pandas.DataFrame input_targets: table of targets. :param int scenario_length: number of hours in new scenario. :param float/None solar_fraction: how much new capacity should be solar. If given None, maintain previous ratio. :param dict/None addl_curtailment: how much new curtailment is expected, by resource. If given None, assume zero. :return: (*pandas.DataFrame*) -- targets dataframe with next capacities added. """ targets = input_targets.copy() new_resources = {"solar", "wind"} participating_targets = targets[targets.ce_target > 0] participating_capacity = pd.Series( { resource: participating_targets[f"{resource}.prev_capacity"].sum() for resource in new_resources } ) participating_generation = pd.Series( { resource: participating_targets[f"{resource}.prev_generation"].sum() for resource in new_resources } ) participating_cap_factor = participating_generation / ( participating_capacity * scenario_length ) if addl_curtailment is None: addl_curtailment = {resource: 0 for resource in new_resources} else: if not isinstance(addl_curtailment, dict): raise TypeError("addl_curtailment must be supplied as a dict") # Check that only proper keys are supplied if not set(addl_curtailment.keys()) <= new_resources: raise ValueError(f"addl_curtailment keys are limited to {new_resources}") # Check that values are numbers between 0 and 1 if not all([isinstance(x, (int, float)) for x in addl_curtailment.values()]): raise ValueError("addl_curtailment values must be numeric") if any([(x < 0) or (x > 1) for x in addl_curtailment.values()]): raise ValueError("addl_curtailment must be between 0 and 1") expected_cf = participating_cap_factor * (1 - pd.Series(addl_curtailment)) if solar_fraction is None: solar_fraction = participating_capacity["solar"] / participating_capacity.sum() avg_new_cf = (expected_cf["solar"] * solar_fraction) + ( expected_cf["wind"] * (1 - solar_fraction) ) overall_shortfall = calculate_overall_shortfall(input_targets, "collaborative") total_new_capacity = overall_shortfall / (avg_new_cf * scenario_length) new_type_capacity = pd.Series( { "solar": total_new_capacity * solar_fraction, "wind": total_new_capacity * (1 - solar_fraction), } ) scaling_factors = 1 + new_type_capacity / participating_capacity for r in ("solar", "wind"): # Fill non-participating targets with previous capacity targets[f"{r}.next_capacity"] = targets[f"{r}.prev_capacity"] # Scale participating targets targets.loc[targets.ce_target > 0, f"{r}.next_capacity"] = ( targets.loc[targets.ce_target > 0, f"{r}.prev_capacity"] * scaling_factors[r] ) return targets
[docs]def create_change_table(input_targets, ref_scenario): """Using a reference scenario, create a change table which scales all plants in a base grid to capacities matching the reference grid, with the exception of wind and solar plants which are scaled up according to the clean capacity scaling logic. :param pandas.DataFrame input_targets: table of targets, with previous and next capacities. :param powersimdata.scenario.scenario.Scenario ref_scenario: reference scenario to mimic. :return: (*dict*) -- dictionary to be passed to a change table. """ epsilon = 1e-3 base_grid = ref_scenario.get_base_grid() grid_zones = base_grid.plant.zone_name.unique() ref_grid = ref_scenario.get_grid() ct = mimic_generation_capacity(base_grid, ref_grid) for region in input_targets.index: prev_solar = input_targets.loc[region, "solar.prev_capacity"] prev_wind = input_targets.loc[region, "wind.prev_capacity"] next_solar = input_targets.loc[region, "solar.next_capacity"] next_wind = input_targets.loc[region, "wind.next_capacity"] zone_names = base_grid.model_immutables.area_to_loadzone(region) zone_ids = [base_grid.zone2id[n] for n in zone_names if n in grid_zones] if prev_solar > 0: scale = next_solar / prev_solar if abs(scale - 1) > epsilon: for id in zone_ids: _apply_zone_scale_factor_to_ct(ct, "solar", id, scale) if prev_wind > 0: scale = next_wind / prev_wind if abs(scale - 1) > epsilon: for id in zone_ids: _apply_zone_scale_factor_to_ct(ct, "wind", id, scale) return ct
[docs]def calculate_clean_capacity_scaling( ref_scenario, method, targets=None, targets_filename=None, addl_curtailment=None, next_scenario=None, solar_fraction=None, ): """Given a reference scenario (to get 'baseline' values), a method, and a set of targets (either via a dataframe or a filename to load a dataframe), calculate capacities for a new scenario to meet the calculated shortfall. :param powersimdata.scenario.scenario.Scenario ref_scenario: Scenario instance to get baseline capacities and capacity factors from. :param str method: which capacity scaling method to use. :param pandas.DataFrame/None targets: a dataframe of targets, containing appropriate columns. :param str/None targets_filename: a filepath to a CSV file of targets, containing appropriate columns. :param dict/pandas.DataFrame/None addl_curtailment: additional expected curtailment, either by resource (for method == 'collaborative'), or by target/resource (for method == 'independent'). :param powersimdata.scenario.scenario.Scenario/None next_scenario: a Scenario to plan for, using this Scenario's length and demand to determine capacity additions. :param float/None solar_fraction: the fraction of new capacity to be solar, for method == 'collaborative' only. For method == 'independent', these values are specified in the targets table. :return: (*pandas.DataFrame*) -- dataframe of targets including new capacities, plus intermediate values used in calculation. :raises TypeError: if ``ref_scenario`` is not a Scenario object. if both ``targets`` and ``targets_filename`` are None or set. if ``targets`` is not a pandas.DataFrame :raises ValueError: if ``ref_scenario`` is not in the analyze state. if ``method`` is incorrectly set. """ allowed_methods = {"independent", "collaborative"} # Input validation if not isinstance(ref_scenario, Scenario): raise TypeError("ref_scenario must be a Scenario object") if ref_scenario.name != "analyze": raise ValueError("ref_scenario must be in Analyze state") if method not in allowed_methods: raise ValueError(f"method must be one of: {allowed_methods}") if targets is None and targets_filename is None: raise TypeError("One of targets or targets_filename must be given") if targets is not None and targets_filename is not None: raise TypeError("targets and targets_filename cannot both be given") if targets is not None and not isinstance(targets, pd.DataFrame): raise TypeError("targets must be a pandas.DataFrame") if targets_filename is not None: targets = load_targets_from_csv(targets_filename) # Add extra information to targets targets = add_resource_data_to_targets(targets, ref_scenario) if next_scenario is not None: targets = add_demand_to_targets(targets, next_scenario) next_scenario_length = _get_scenario_length(next_scenario) else: targets = add_demand_to_targets(targets, ref_scenario) next_scenario_length = _get_scenario_length(ref_scenario) targets = add_shortfall_to_targets(targets) # Calculate new capacities if method == "independent": if addl_curtailment is not None: if not isinstance(addl_curtailment, (dict, pd.DataFrame)): raise TypeError("addl_curtailment must be dict or pandas.DataFrame") if isinstance(addl_curtailment, dict): addl_curtailment = pd.DataFrame.from_dict(addl_curtailment) if set(addl_curtailment.columns) <= set(targets.index): addl_curtailment = addl_curtailment.transpose() if np.sum((addl_curtailment < 0).to_numpy()) > 0: raise ValueError("addl_curtailment contains negative values") if np.sum((addl_curtailment > 1).to_numpy()) > 0: raise ValueError("addl_curtailment contains values > 1") targets = add_new_capacities_independent( targets, next_scenario_length, addl_curtailment ) elif method == "collaborative": targets = add_new_capacities_collaborative( targets, next_scenario_length, solar_fraction, addl_curtailment ) return targets